3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
32 """Defines executor tasks handlers for MapReduce implementation."""
48 from google
.appengine
.ext
import ndb
50 from google
.appengine
import runtime
51 from google
.appengine
.api
import datastore_errors
52 from google
.appengine
.api
import logservice
53 from google
.appengine
.api
import modules
54 from google
.appengine
.api
import taskqueue
55 from google
.appengine
.ext
import db
56 from google
.appengine
.ext
.mapreduce
import base_handler
57 from google
.appengine
.ext
.mapreduce
import context
58 from google
.appengine
.ext
.mapreduce
import errors
59 from google
.appengine
.ext
.mapreduce
import input_readers
60 from google
.appengine
.ext
.mapreduce
import model
61 from google
.appengine
.ext
.mapreduce
import operation
62 from google
.appengine
.ext
.mapreduce
import parameters
63 from google
.appengine
.ext
.mapreduce
import util
64 from google
.appengine
.runtime
import apiproxy_errors
68 from google
.appengine
.ext
import cloudstorage
73 if hasattr(cloudstorage
, "_STUB"):
88 _TEST_INJECTED_FAULTS
= set()
91 def _run_task_hook(hooks
, method
, task
, queue_name
):
92 """Invokes hooks.method(task, queue_name).
95 hooks: A hooks.Hooks instance or None.
96 method: The name of the method to invoke on the hooks class e.g.
97 "enqueue_kickoff_task".
98 task: The taskqueue.Task to pass to the hook method.
99 queue_name: The name of the queue to pass to the hook method.
102 True if the hooks.Hooks instance handled the method, False otherwise.
104 if hooks
is not None:
106 getattr(hooks
, method
)(task
, queue_name
)
107 except NotImplementedError:
115 class MapperWorkerCallbackHandler(base_handler
.HugeTaskHandler
):
116 """Callback handler for mapreduce worker task."""
118 _TASK_DIRECTIVE
= util
._enum
(
120 PROCEED_TASK
="proceed_task",
122 RETRY_TASK
="retry_task",
124 DROP_TASK
="drop_task",
126 RECOVER_SLICE
="recover_slice",
128 RETRY_SHARD
="retry_shard",
130 FAIL_TASK
="fail_task")
132 def __init__(self
, *args
):
134 super(MapperWorkerCallbackHandler
, self
).__init
__(*args
)
135 self
._time
= time
.time
137 def _drop_gracefully(self
):
138 """Drop worker task gracefully.
140 Set current shard_state to failed. Controller logic will take care of
141 other shards and the entire MR.
143 shard_id
= self
.request
.headers
[util
._MR
_SHARD
_ID
_TASK
_HEADER
]
144 mr_id
= self
.request
.headers
[util
._MR
_ID
_TASK
_HEADER
]
145 shard_state
, mr_state
= db
.get([
146 model
.ShardState
.get_key_by_shard_id(shard_id
),
147 model
.MapreduceState
.get_key_by_job_id(mr_id
)])
149 if shard_state
and shard_state
.active
:
150 shard_state
.set_for_failure()
151 config
= util
.create_datastore_write_config(mr_state
.mapreduce_spec
)
152 shard_state
.put(config
=config
)
154 def _try_acquire_lease(self
, shard_state
, tstate
):
155 """Validate datastore and the task payload are consistent.
157 If so, attempt to get a lease on this slice's execution.
158 See model.ShardState doc on slice_start_time.
161 shard_state: model.ShardState from datastore.
162 tstate: model.TransientShardState from taskqueue paylod.
165 A _TASK_DIRECTIVE enum. PROCEED_TASK if lock is acquired.
166 RETRY_TASK if task should be retried, DROP_TASK if task should
167 be dropped. Only old tasks (comparing to datastore state)
168 will be dropped. Future tasks are retried until they naturally
169 become old so that we don't ever stuck MR.
173 logging
.warning("State not found for shard %s; Possible spurious task "
174 "execution. Dropping this task.",
176 return self
._TASK
_DIRECTIVE
.DROP_TASK
178 if not shard_state
.active
:
179 logging
.warning("Shard %s is not active. Possible spurious task "
180 "execution. Dropping this task.", tstate
.shard_id
)
181 logging
.warning(str(shard_state
))
182 return self
._TASK
_DIRECTIVE
.DROP_TASK
185 if shard_state
.retries
> tstate
.retries
:
187 "Got shard %s from previous shard retry %s. Possible spurious "
188 "task execution. Dropping this task.",
191 logging
.warning(str(shard_state
))
192 return self
._TASK
_DIRECTIVE
.DROP_TASK
193 elif shard_state
.retries
< tstate
.retries
:
198 "ShardState for %s is behind slice. Waiting for it to catch up",
199 shard_state
.shard_id
)
200 return self
._TASK
_DIRECTIVE
.RETRY_TASK
204 if shard_state
.slice_id
> tstate
.slice_id
:
206 "Task %s-%s is behind ShardState %s. Dropping task.""",
207 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
208 return self._TASK_DIRECTIVE.DROP_TASK
212 elif shard_state.slice_id < tstate.slice_id:
214 "Task
%s-%s is ahead of ShardState
%s. Waiting
for it to catch up
.",
215 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
216 return self._TASK_DIRECTIVE.RETRY_TASK
220 if shard_state.slice_start_time:
221 countdown = self._wait_time(shard_state,
222 parameters.config._LEASE_GRACE_PERIOD +
223 parameters.config._SLICE_DURATION_SEC)
226 "Last retry of
slice %s-%s may be still running
."
227 "Will
try again
in %s seconds
", tstate.shard_id, tstate.slice_id,
232 time.sleep(countdown)
233 return self._TASK_DIRECTIVE.RETRY_TASK
236 if self._wait_time(shard_state,
237 parameters.config._REQUEST_EVENTUAL_TIMEOUT):
238 if not self._has_old_request_ended(shard_state):
240 "Last retry of
slice %s-%s is still
in flight with request_id
"
241 "%s. Will
try again later
.", tstate.shard_id, tstate.slice_id,
242 shard_state.slice_request_id)
243 return self._TASK_DIRECTIVE.RETRY_TASK
246 "Last retry of
slice %s-%s has no log entry
and has
"
247 "timed out after
%s seconds
",
248 tstate.shard_id, tstate.slice_id,
249 parameters.config._REQUEST_EVENTUAL_TIMEOUT)
252 config = util.create_datastore_write_config(tstate.mapreduce_spec)
253 @db.transactional(retries=5)
255 """Use datastore to set slice_start_time to now.
257 If failed for any reason, raise error to retry the task (hence all
258 the previous validation code). The task would die naturally eventually.
261 A _TASK_DIRECTIVE enum.
263 fresh_state = model.ShardState.get_by_shard_id(tstate.shard_id)
265 logging.warning("ShardState missing
.")
267 if (fresh_state.active and
268 fresh_state.slice_id == shard_state.slice_id and
269 fresh_state.slice_start_time == shard_state.slice_start_time):
270 shard_state.slice_start_time = datetime.datetime.now()
271 shard_state.slice_request_id = os.environ.get("REQUEST_LOG_ID
")
272 shard_state.acquired_once = True
273 shard_state.put(config=config)
274 return self._TASK_DIRECTIVE.PROCEED_TASK
277 "Contention on
slice %s-%s execution
. Will retry again
.",
278 tstate.shard_id, tstate.slice_id)
280 time.sleep(random.randrange(1, 5))
281 return self._TASK_DIRECTIVE.RETRY_TASK
285 def _has_old_request_ended(self, shard_state):
286 """Whether previous slice retry has ended according to Logs API.
289 shard_state: shard state.
292 True if the request of previous slice retry has ended. False if it has
295 assert shard_state.slice_start_time is not None
296 assert shard_state.slice_request_id is not None
297 request_ids = [shard_state.slice_request_id]
298 logs = list(logservice.fetch(
299 request_ids=request_ids,
301 module_versions=[(os.environ["CURRENT_MODULE_ID
"],
302 modules.get_current_version_name())]))
304 if not logs or not logs[0].finished:
308 def _wait_time(self, shard_state, secs, now=datetime.datetime.now):
309 """Time to wait until slice_start_time is secs ago from now.
312 shard_state: shard state.
313 secs: duration in seconds.
314 now: a func that gets now.
317 0 if no wait. A positive int in seconds otherwise. Always around up.
319 assert shard_state.slice_start_time is not None
320 delta = now() - shard_state.slice_start_time
321 duration = datetime.timedelta(seconds=secs)
323 return util.total_seconds(duration - delta)
327 def _try_free_lease(self, shard_state, slice_retry=False):
328 """Try to free lease.
330 A lightweight transaction to update shard_state and unset
331 slice_start_time to allow the next retry to happen without blocking.
332 We don't care if this fails or not because the lease will expire
335 Under normal execution, _save_state_and_schedule_next is the exit point.
336 It updates/saves shard state and schedules the next slice or returns.
337 Other exit points are:
338 1. _are_states_consistent: at the beginning of handle, checks
339 if datastore states and the task are in sync.
340 If not, raise or return.
341 2. _attempt_slice_retry: may raise exception to taskqueue.
342 3. _save_state_and_schedule_next: may raise exception when taskqueue/db
345 This handler should try to free the lease on every exceptional exit point.
348 shard_state: model.ShardState.
349 slice_retry: whether to count this as a failed slice execution.
353 fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
354 if fresh_state and fresh_state.active:
356 fresh_state.slice_start_time = None
357 fresh_state.slice_request_id = None
359 fresh_state.slice_retries += 1
367 "Release lock
for shard
%s failed
. Wait
for lease to expire
.",
368 shard_state.shard_id)
373 This method has to be careful to pass the same ShardState instance to
374 its subroutines calls if the calls mutate or read from ShardState.
375 Note especially that Context instance caches and updates the ShardState
379 Set HTTP status code and always returns None.
382 self._start_time = self._time()
383 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
384 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
385 spec = model.MapreduceSpec._get_mapreduce_spec(mr_id)
386 shard_state, control = db.get([
387 model.ShardState.get_key_by_shard_id(shard_id),
388 model.MapreduceControl.get_key_by_job_id(mr_id),
392 ctx = context.Context(spec, shard_state,
393 task_retry_count=self.task_retry_count())
394 context.Context._set(ctx)
397 tstate = model.TransientShardState.from_request(self.request)
401 is_this_a_retry = shard_state.acquired_once
402 task_directive = self._try_acquire_lease(shard_state, tstate)
403 if task_directive == self._TASK_DIRECTIVE.RETRY_TASK:
404 return self.retry_task()
405 if task_directive == self._TASK_DIRECTIVE.DROP_TASK:
407 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
410 if control and control.command == model.MapreduceControl.ABORT:
411 logging.info("Abort command received by shard
%d of job
'%s'",
412 shard_state.shard_number, shard_state.mapreduce_id)
415 shard_state.set_for_abort()
416 shard_state.put(config=util.create_datastore_write_config(spec))
421 util._set_ndb_cache_policy()
423 cloudstorage.set_default_retry_params(
424 cloudstorage.RetryParams(
425 urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC))
429 task_directive = self._attempt_slice_recovery(shard_state, tstate)
431 if task_directive == self._TASK_DIRECTIVE.PROCEED_TASK:
432 finished_shard = self._process_inputs(
433 tstate.input_reader, shard_state, tstate, ctx)
437 if tstate.output_writer:
442 tstate.output_writer.finalize(ctx, shard_state)
443 shard_state.set_for_success()
446 logging.warning("Shard
%s got error
.", shard_state.shard_id)
447 logging.error(traceback.format_exc())
450 if type(e) is errors.FailJobError:
451 logging.error("Got FailJobError
.")
452 task_directive = self._TASK_DIRECTIVE.FAIL_TASK
454 task_directive = self._TASK_DIRECTIVE.RETRY_TASK
456 task_directive = self._set_state(shard_state, tstate, task_directive)
457 self._save_state_and_schedule_next(shard_state, tstate, task_directive)
459 def _process_inputs(self,
464 """Read inputs, process them, and write out outputs.
466 This is the core logic of MapReduce. It reads inputs from input reader,
467 invokes user specified mapper function, and writes output with
468 output writer. It also updates shard_state accordingly.
469 e.g. if shard processing is done, set shard_state.active to False.
471 If errors.FailJobError is caught, it will fail this MR job.
472 All other exceptions will be logged and raised to taskqueue for retry
473 until the number of retries exceeds a limit.
476 input_reader: input reader.
477 shard_state: shard state.
478 tstate: transient shard state.
479 ctx: mapreduce context.
482 Whether this shard has finished processing all its input split.
484 processing_limit = self._processing_limit(tstate.mapreduce_spec)
485 if processing_limit == 0:
488 finished_shard = True
490 iterator = iter(input_reader)
494 entity = iterator.next()
495 except StopIteration:
508 if isinstance(entity, db.Model):
509 shard_state.last_work_item = repr(entity.key())
510 elif isinstance(entity, ndb.Model):
511 shard_state.last_work_item = repr(entity.key)
513 shard_state.last_work_item = repr(entity)[:100]
515 processing_limit -= 1
517 if not self._process_datum(
518 entity, input_reader, ctx, tstate):
519 finished_shard = False
521 elif processing_limit == 0:
522 finished_shard = False
526 operation.counters.Increment(
527 context.COUNTER_MAPPER_WALLTIME_MS,
528 int((self._time() - self._start_time)*1000))(ctx)
531 return finished_shard
533 def _process_datum(self, data, input_reader, ctx, transient_shard_state):
534 """Process a single data piece.
536 Call mapper handler on the data.
539 data: a datum to process.
540 input_reader: input reader.
541 ctx: mapreduce context
542 transient_shard_state: transient shard state.
545 True if scan should be continued, False if scan should be stopped.
547 if data is not input_readers.ALLOW_CHECKPOINT:
548 ctx.counters.increment(context.COUNTER_MAPPER_CALLS)
550 handler = transient_shard_state.handler
552 if input_reader.expand_parameters:
553 result = handler(*data)
555 result = handler(data)
557 if util.is_generator(result):
558 for output in result:
559 if isinstance(output, operation.Operation):
562 output_writer = transient_shard_state.output_writer
563 if not output_writer:
565 "Handler yielded
%s, but no output writer
is set.", output)
567 output_writer.write(output)
569 if self._time() - self._start_time >= parameters.config._SLICE_DURATION_SEC:
573 def _set_state(self, shard_state, tstate, task_directive):
574 """Set shard_state and tstate based on task_directive.
577 shard_state: model.ShardState for current shard.
578 tstate: model.TransientShardState for current shard.
579 task_directive: self._TASK_DIRECTIVE for current shard.
582 A _TASK_DIRECTIVE enum. PROCEED_TASK if task should proceed normally.
583 RETRY_SHARD if shard should be retried. RETRY_TASK if slice should be
584 retried. FAIL_TASK if task should. RECOVER_SLICE if slice should be
587 if task_directive == self._TASK_DIRECTIVE.PROCEED_TASK:
588 shard_state.advance_for_next_slice()
589 tstate.advance_for_next_slice()
590 return task_directive
592 if task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
593 tstate.advance_for_next_slice(recovery_slice=True)
594 shard_state.advance_for_next_slice(recovery_slice=True)
595 return task_directive
597 if task_directive == self._TASK_DIRECTIVE.RETRY_TASK:
598 task_directive = self._attempt_slice_retry(shard_state, tstate)
599 if task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
600 task_directive = self._attempt_shard_retry(shard_state, tstate)
601 if task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
602 shard_state.set_for_failure()
603 return task_directive
605 def _save_state_and_schedule_next(self, shard_state, tstate, task_directive):
606 """Save state and schedule task.
608 Save shard state to datastore.
609 Schedule next slice if needed.
610 Set HTTP response code.
611 No modification to any shard_state or tstate.
614 shard_state: model.ShardState for current shard.
615 tstate: model.TransientShardState for current shard.
616 task_directive: enum _TASK_DIRECTIVE.
618 spec = tstate.mapreduce_spec
620 if task_directive == self._TASK_DIRECTIVE.RETRY_TASK:
622 return self.retry_task()
623 elif task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
624 logging.critical("Shard
%s failed permanently
.", shard_state.shard_id)
626 elif task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
627 logging.warning("Shard
%s is going to be attempted
for the
%s time
.",
628 shard_state.shard_id,
629 shard_state.retries + 1)
630 task = self._state_to_task(tstate, shard_state)
631 elif task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
632 logging.warning("Shard
%s slice %s is being recovered
.",
633 shard_state.shard_id,
634 shard_state.slice_id)
635 task = self._state_to_task(tstate, shard_state)
637 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
638 countdown = self._get_countdown_for_next_slice(spec)
639 task = self._state_to_task(tstate, shard_state, countdown=countdown)
642 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME
",
646 config = util.create_datastore_write_config(spec)
648 @db.transactional(retries=5)
650 fresh_shard_state = model.ShardState.get_by_shard_id(tstate.shard_id)
651 if not fresh_shard_state:
653 if (not fresh_shard_state.active or
654 "worker_active_state_collision
" in _TEST_INJECTED_FAULTS):
655 logging.warning("Shard
%s is not active
. Possible spurious task
"
656 "execution
. Dropping this task
.", tstate.shard_id)
657 logging.warning("Datastore
's %s", str(fresh_shard_state))
658 logging.warning("Slice's
%s", str(shard_state))
660 fresh_shard_state.copy_from(shard_state)
661 fresh_shard_state.put(config=config)
666 if fresh_shard_state.active:
669 self._add_task(task, spec, queue_name)
673 except (datastore_errors.Error,
675 runtime.DeadlineExceededError,
676 apiproxy_errors.Error), e:
678 "Can
't transactionally continue shard. "
679 "Will retry slice %s %s for the %s time.",
682 self.task_retry_count() + 1)
683 self._try_free_lease(shard_state)
686 def _attempt_slice_recovery(self, shard_state, tstate):
689 This is run when a slice had been previously attempted and output
690 may have been written. If an output writer requires slice recovery,
691 we run those logic to remove output duplicates. Otherwise we just retry
694 If recovery is needed, then the entire slice will be dedicated
695 to recovery logic. No data processing will take place. Thus we call
696 the slice "recovery slice". This is needed for correctness:
697 An output writer instance can be out of sync from its physical
698 medium only when the slice dies after acquring the shard lock but before
699 committing shard state to db. The worst failure case is when
700 shard state failed to commit after the NAMED task for the next slice was
701 added. Thus, recovery slice has a special logic to increment current
702 slice_id n to n+2. If the task for n+1 had been added, it will be dropped
703 because it is behind shard state.
706 shard_state: an instance of Model.ShardState.
707 tstate: an instance of Model.TransientShardState.
710 _TASK_DIRECTIVE.PROCEED_TASK to continue with this retry.
711 _TASK_DIRECTIVE.RECOVER_SLICE to recover this slice.
712 The next slice will start at the same input as
713 this slice but output to a new instance of output writer.
714 Combining outputs from all writer instances is up to implementation.
716 mapper_spec = tstate.mapreduce_spec.mapper
717 if not (tstate.output_writer and
718 tstate.output_writer._supports_slice_recovery(mapper_spec)):
719 return self._TASK_DIRECTIVE.PROCEED_TASK
721 tstate.output_writer = tstate.output_writer._recover(
722 tstate.mapreduce_spec, shard_state.shard_number,
723 shard_state.retries + 1)
724 return self._TASK_DIRECTIVE.RECOVER_SLICE
726 def _attempt_shard_retry(self, shard_state, tstate):
727 """Whether to retry shard.
729 This method may modify shard_state and tstate to prepare for retry or fail.
732 shard_state: model.ShardState for current shard.
733 tstate: model.TransientShardState for current shard.
736 A _TASK_DIRECTIVE enum. RETRY_SHARD if shard should be retried.
739 shard_attempts = shard_state.retries + 1
741 if shard_attempts >= parameters.config.SHARD_MAX_ATTEMPTS:
743 "Shard attempt %s exceeded %s max attempts.",
744 shard_attempts, parameters.config.SHARD_MAX_ATTEMPTS)
745 return self._TASK_DIRECTIVE.FAIL_TASK
746 if tstate.output_writer and (
747 not tstate.output_writer._supports_shard_retry(tstate)):
748 logging.warning("Output writer %s does not support shard retry.",
749 tstate.output_writer.__class__.__name__)
750 return self._TASK_DIRECTIVE.FAIL_TASK
752 shard_state.reset_for_retry()
753 logging.warning("Shard %s attempt %s failed with up to %s attempts.",
754 shard_state.shard_id,
756 parameters.config.SHARD_MAX_ATTEMPTS)
758 if tstate.output_writer:
759 output_writer = tstate.output_writer.create(
760 tstate.mapreduce_spec, shard_state.shard_number, shard_attempts + 1)
761 tstate.reset_for_retry(output_writer)
762 return self._TASK_DIRECTIVE.RETRY_SHARD
764 def _attempt_slice_retry(self, shard_state, tstate):
765 """Attempt to retry this slice.
767 This method may modify shard_state and tstate to prepare for retry or fail.
770 shard_state: model.ShardState for current shard.
771 tstate: model.TransientShardState for current shard.
774 A _TASK_DIRECTIVE enum. RETRY_TASK if slice should be retried.
775 RETRY_SHARD if shard retry should be attempted.
777 if (shard_state.slice_retries + 1 <
778 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS):
780 "Slice %s %s failed for the %s of up to %s attempts "
781 "(%s of %s taskqueue execution attempts). "
785 shard_state.slice_retries + 1,
786 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS,
787 self.task_retry_count() + 1,
788 parameters.config.TASK_MAX_ATTEMPTS)
793 self._try_free_lease(shard_state, slice_retry=True)
794 return self._TASK_DIRECTIVE.RETRY_TASK
796 if parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS > 0:
797 logging.warning("Slice attempt %s exceeded %s max attempts.",
798 self.task_retry_count() + 1,
799 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)
800 return self._TASK_DIRECTIVE.RETRY_SHARD
803 def get_task_name(shard_id, slice_id, retry=0):
804 """Compute single worker task name.
809 retry: current shard retry count.
812 task name which should be used to process specified shard/slice.
816 return "appengine-mrshard-%s-%s-retry-%s" % (
817 shard_id, slice_id, retry)
819 def _get_countdown_for_next_slice(self, spec):
820 """Get countdown for next slice's task
.
822 When user sets processing rate
, we
set countdown to delay task execution
.
825 spec
: model
.MapreduceSpec
831 if self._processing_limit(spec) != -1:
833 int(parameters.config._SLICE_DURATION_SEC -
834 (self._time() - self._start_time)), 0)
838 def _state_to_task(cls,
843 """Generate task
for slice according to current states
.
846 tstate
: An instance of TransientShardState
.
847 shard_state
: An instance of ShardState
.
848 eta
: Absolute time when the MR should execute
. May
not be specified
849 if 'countdown' is also supplied
. This may be timezone
-aware
or
851 countdown
: Time
in seconds into the future that this MR should execute
.
855 A model
.HugeTask instance
for the
slice specified by current states
.
857 base_path = tstate.base_path
859 task_name = MapperWorkerCallbackHandler.get_task_name(
864 headers = util._get_task_headers(tstate.mapreduce_spec)
865 headers[util._MR_SHARD_ID_TASK_HEADER] = tstate.shard_id
867 worker_task = model.HugeTask(
868 url=base_path + "/worker_callback/" + tstate.shard_id,
869 params=tstate.to_dict(),
882 """Schedule
slice scanning by adding it to the task queue
.
885 worker_task
: a model
.HugeTask task
for slice. This
is NOT a taskqueue
887 mapreduce_spec
: an instance of model
.MapreduceSpec
.
888 queue_name
: Optional queue to run on
; uses the current queue of
889 execution
or the default queue
if unspecified
.
891 if not _run_task_hook(mapreduce_spec.get_hooks(),
892 "enqueue_worker_task",
898 worker_task.add(queue_name)
899 except (taskqueue.TombstonedTaskError,
900 taskqueue.TaskAlreadyExistsError), e:
901 logging.warning("Task %r already exists. %s: %s",
906 def _processing_limit(self, spec):
907 """Get the limit on the number of
map calls allowed by this
slice.
910 spec
: a Mapreduce spec
.
913 The limit
as a positive
int if specified by user
. -1 otherwise
.
915 processing_rate = float(spec.mapper.params.get("processing_rate", 0))
916 slice_processing_limit = -1
917 if processing_rate > 0:
918 slice_processing_limit = int(math.ceil(
919 parameters.config._SLICE_DURATION_SEC*processing_rate/
920 int(spec.mapper.shard_count)))
921 return slice_processing_limit
926 def _schedule_slice(cls,
932 """Schedule
slice scanning by adding it to the task queue
.
935 shard_state
: An instance of ShardState
.
936 tstate
: An instance of TransientShardState
.
937 queue_name
: Optional queue to run on
; uses the current queue of
938 execution
or the default queue
if unspecified
.
939 eta
: Absolute time when the MR should execute
. May
not be specified
940 if 'countdown' is also supplied
. This may be timezone
-aware
or
942 countdown
: Time
in seconds into the future that this MR should execute
.
945 queue_name = queue_name or os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
947 task = cls._state_to_task(tstate, shard_state, eta, countdown)
948 cls._add_task(task, tstate.mapreduce_spec, queue_name)
951 class ControllerCallbackHandler(base_handler.HugeTaskHandler):
952 """Supervises mapreduce execution
.
954 Is also responsible
for gathering execution status
from shards together
.
956 This task
is "continuously" running by adding itself again to taskqueue
if
957 and only
if mapreduce
is still active
. A mapreduce
is active
if it has
958 actively running shards
.
961 def __init__(self, *args):
963 super(ControllerCallbackHandler, self).__init__(*args)
964 self._time = time.time
966 def _drop_gracefully(self):
967 """Gracefully drop controller task
.
969 This method
is called when decoding controller task payload failed
.
970 Upon this we mark ShardState
and MapreduceState
as failed so all
973 Writing to datastore
is forced (ignore read
-only mode
) because we
974 want the tasks to stop badly
, and if force_writes was
False,
975 the job would have never been started
.
977 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
978 state = model.MapreduceState.get_by_job_id(mr_id)
979 if not state or not state.active:
983 state.result_status = model.MapreduceState.RESULT_FAILED
984 config = util.create_datastore_write_config(state.mapreduce_spec)
986 for ss in model.ShardState.find_all_by_mapreduce_state(state):
991 if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
992 db.put(puts, config=config)
994 db.put(puts, config=config)
996 db.put(state, config=config)
999 """Handle request
."""
1000 spec = model.MapreduceSpec.from_json_str(
1001 self.request.get("mapreduce_spec"))
1002 state, control = db.get([
1003 model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
1004 model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
1008 logging.warning("State not found for MR '%s'; dropping controller task.",
1011 if not state.active:
1013 "MR %r is not active. Looks like spurious controller task execution.",
1015 self._clean_up_mr(spec)
1018 shard_states = model.ShardState.find_all_by_mapreduce_state(state)
1019 self._update_state_from_shard_states(state, shard_states, control)
1022 ControllerCallbackHandler.reschedule(
1023 state, spec, self.serial_id() + 1)
1025 def _update_state_from_shard_states(self, state, shard_states, control):
1026 """Update mr state by examing shard states
.
1029 state
: current mapreduce state
as MapreduceState
.
1030 shard_states
: an iterator over shard states
.
1031 control
: model
.MapreduceControl entity
.
1034 state.active_shards, state.aborted_shards, state.failed_shards = 0, 0, 0
1036 processed_counts = []
1037 state.counters_map.clear()
1040 for s in shard_states:
1043 state.active_shards += 1
1044 if s.result_status == model.ShardState.RESULT_ABORTED:
1045 state.aborted_shards += 1
1046 elif s.result_status == model.ShardState.RESULT_FAILED:
1047 state.failed_shards += 1
1050 state.counters_map.add_map(s.counters_map)
1051 processed_counts.append(s.counters_map.get(context.COUNTER_MAPPER_CALLS))
1053 state.set_processed_counts(processed_counts)
1054 state.last_poll_time = datetime.datetime.utcfromtimestamp(self._time())
1056 spec = state.mapreduce_spec
1058 if total_shards != spec.mapper.shard_count:
1059 logging.error("Found %d shard states. Expect %d. "
1060 "Issuing abort command to job '%s'",
1061 total_shards, spec.mapper.shard_count,
1064 model.MapreduceControl.abort(spec.mapreduce_id)
1068 state.active = bool(state.active_shards)
1069 if not control and (state.failed_shards or state.aborted_shards):
1071 model.MapreduceControl.abort(spec.mapreduce_id)
1073 if not state.active:
1075 if state.failed_shards or not total_shards:
1076 state.result_status = model.MapreduceState.RESULT_FAILED
1079 elif state.aborted_shards:
1080 state.result_status = model.MapreduceState.RESULT_ABORTED
1082 state.result_status = model.MapreduceState.RESULT_SUCCESS
1083 self._finalize_outputs(spec, state)
1084 self._finalize_job(spec, state)
1086 @db.transactional(retries=5)
1088 fresh_state = model.MapreduceState.get_by_job_id(spec.mapreduce_id)
1091 if not fresh_state.active:
1093 "Job %s is not active. Looks like spurious task execution. "
1094 "Dropping controller task.", spec.mapreduce_id)
1096 config = util.create_datastore_write_config(spec)
1097 state.put(config=config)
1101 def serial_id(self):
1102 """Get serial unique identifier of this task
from request
.
1105 serial identifier
as int.
1107 return int(self.request.get("serial_id"))
1110 def _finalize_outputs(cls, mapreduce_spec, mapreduce_state):
1111 """Finalize outputs
.
1114 mapreduce_spec
: an instance of MapreduceSpec
.
1115 mapreduce_state
: an instance of MapreduceState
.
1118 if (mapreduce_spec.mapper.output_writer_class() and
1119 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
1120 mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
1123 def _finalize_job(cls, mapreduce_spec, mapreduce_state):
1124 """Finalize job execution
.
1126 Invokes done callback
and save mapreduce state
in a transaction
,
1127 and schedule necessary clean ups
. This method
is idempotent
.
1130 mapreduce_spec
: an instance of MapreduceSpec
1131 mapreduce_state
: an instance of MapreduceState
1133 config = util.create_datastore_write_config(mapreduce_spec)
1134 queue_name = util.get_queue_name(mapreduce_spec.params.get(
1135 model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
1136 done_callback = mapreduce_spec.params.get(
1137 model.MapreduceSpec.PARAM_DONE_CALLBACK)
1140 done_task = taskqueue.Task(
1142 headers=util._get_task_headers(mapreduce_spec,
1143 util.CALLBACK_MR_ID_TASK_HEADER),
1144 method=mapreduce_spec.params.get("done_callback_method", "POST"))
1146 @db.transactional(retries=5)
1148 fresh_state = model.MapreduceState.get_by_job_id(
1149 mapreduce_spec.mapreduce_id)
1150 if not fresh_state.active:
1152 "Job %s is not active. Looks like spurious task execution. "
1153 "Dropping task.", mapreduce_spec.mapreduce_id)
1155 mapreduce_state.put(config=config)
1157 if done_task and not _run_task_hook(
1158 mapreduce_spec.get_hooks(),
1159 "enqueue_done_task",
1162 done_task.add(queue_name, transactional=True)
1165 logging.info("Final result for job '%s' is '%s'",
1166 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
1167 cls._clean_up_mr(mapreduce_spec)
1170 def _clean_up_mr(cls, mapreduce_spec):
1171 FinalizeJobHandler.schedule(mapreduce_spec)
1174 def get_task_name(mapreduce_spec, serial_id):
1175 """Compute single controller task name
.
1178 transient_shard_state
: an instance of TransientShardState
.
1181 task name which should be used to process specified shard
/slice.
1185 return "appengine-mrcontrol-%s-%s" % (
1186 mapreduce_spec.mapreduce_id, serial_id)
1189 def controller_parameters(mapreduce_spec, serial_id):
1190 """Fill
in controller task parameters
.
1192 Returned parameters
map is to be used
as task payload
, and it contains
1193 all the data
, required by controller to perform its function
.
1196 mapreduce_spec
: specification of the mapreduce
.
1197 serial_id
: id of the invocation
as int.
1200 string
->string
map of parameters to be used
as task payload
.
1202 return {"mapreduce_spec": mapreduce_spec.to_json_str(),
1203 "serial_id": str(serial_id)}
1211 """Schedule new update status callback task
.
1214 mapreduce_state
: mapreduce state
as model
.MapreduceState
1215 mapreduce_spec
: mapreduce specification
as MapreduceSpec
.
1216 serial_id
: id of the invocation
as int.
1217 queue_name
: The queue to schedule this task on
. Will use the current
1218 queue of execution
if not supplied
.
1220 task_name = ControllerCallbackHandler.get_task_name(
1221 mapreduce_spec, serial_id)
1222 task_params = ControllerCallbackHandler.controller_parameters(
1223 mapreduce_spec, serial_id)
1225 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
1227 controller_callback_task = model.HugeTask(
1228 url=(mapreduce_spec.params["base_path"] + "/controller_callback/" +
1229 mapreduce_spec.mapreduce_id),
1230 name=task_name, params=task_params,
1231 countdown=parameters.config._CONTROLLER_PERIOD_SEC,
1232 parent=mapreduce_state,
1233 headers=util._get_task_headers(mapreduce_spec))
1235 if not _run_task_hook(mapreduce_spec.get_hooks(),
1236 "enqueue_controller_task",
1237 controller_callback_task,
1240 controller_callback_task.add(queue_name)
1241 except (taskqueue.TombstonedTaskError,
1242 taskqueue.TaskAlreadyExistsError), e:
1243 logging.warning("Task %r with params %r already exists. %s: %s",
1244 task_name, task_params, e.__class__, e)
1247 class KickOffJobHandler(base_handler.TaskQueueHandler):
1248 """Taskqueue handler which kicks off a mapreduce processing
.
1250 This handler
is idempotent
.
1253 The Model
.MapreduceState entity
for this mr
is already created
and
1254 saved to datastore by StartJobHandler
._start
_map
.
1257 mapreduce_id
: in string
.
1261 _SERIALIZED_INPUT_READERS_KEY = "input_readers_for_mr_%s"
1264 """Handles kick off request
."""
1266 mr_id = self.request.get("mapreduce_id")
1268 logging.info("Processing kickoff for job %s", mr_id)
1269 state = model.MapreduceState.get_by_job_id(mr_id)
1270 if not self._check_mr_state(state, mr_id):
1274 readers, serialized_readers_entity = self._get_input_readers(state)
1277 logging.warning("Found no mapper input data to process.")
1278 state.active = False
1279 state.result_status = model.MapreduceState.RESULT_SUCCESS
1280 ControllerCallbackHandler._finalize_job(
1281 state.mapreduce_spec, state)
1285 self._setup_output_writer(state)
1289 result = self._save_states(state, serialized_readers_entity)
1291 readers, _ = self._get_input_readers(state)
1295 queue_name = self.request.headers.get("X-AppEngine-QueueName")
1296 KickOffJobHandler._schedule_shards(state.mapreduce_spec, readers,
1298 state.mapreduce_spec.params["base_path"],
1301 ControllerCallbackHandler.reschedule(
1302 state, state.mapreduce_spec, serial_id=0, queue_name=queue_name)
1304 def _get_input_readers(self, state):
1305 """Get
input readers
.
1308 state
: a MapreduceState model
.
1311 A
tuple: (a
list of
input readers
, a model
._HugeTaskPayload entity
).
1312 The payload entity contains the json serialized
input readers
.
1313 (None, None) when
input reader inplitting returned no data to process
.
1315 serialized_input_readers_key = (self._SERIALIZED_INPUT_READERS_KEY %
1316 state.key().id_or_name())
1317 serialized_input_readers = model._HugeTaskPayload.get_by_key_name(
1318 serialized_input_readers_key, parent=state)
1321 input_reader_class = state.mapreduce_spec.mapper.input_reader_class()
1322 if serialized_input_readers is None:
1323 readers = input_reader_class.split_input(
1324 state.mapreduce_spec.mapper)
1326 readers = [input_reader_class.from_json_str(json) for json in
1327 simplejson.loads(serialized_input_readers.payload)]
1333 state.mapreduce_spec.mapper.shard_count = len(readers)
1334 state.active_shards = len(readers)
1337 if serialized_input_readers is None:
1339 serialized_input_readers = model._HugeTaskPayload(
1340 key_name=serialized_input_readers_key, parent=state)
1341 readers_json_str = [i.to_json_str() for i in readers]
1342 serialized_input_readers.payload = simplejson.dumps(readers_json_str)
1343 return readers, serialized_input_readers
1345 def _setup_output_writer(self, state):
1346 if not state.writer_state:
1347 output_writer_class = state.mapreduce_spec.mapper.output_writer_class()
1348 if output_writer_class:
1349 output_writer_class.init_job(state)
1352 def _save_states(self, state, serialized_readers_entity):
1353 """Run transaction to save state
.
1356 state
: a model
.MapreduceState entity
.
1357 serialized_readers_entity
: a model
._HugeTaskPayload entity containing
1358 json serialized
input readers
.
1361 False if a fatal error
is encountered
and this task should be dropped
1362 immediately
. True if transaction
is successful
. None if a previous
1363 attempt of this same transaction has already succeeded
.
1365 mr_id = state.key().id_or_name()
1366 fresh_state = model.MapreduceState.get_by_job_id(mr_id)
1367 if not self._check_mr_state(fresh_state, mr_id):
1369 if fresh_state.active_shards != 0:
1371 "Mapreduce %s already has active shards. Looks like spurious task "
1372 "execution.", mr_id)
1374 config = util.create_datastore_write_config(state.mapreduce_spec)
1375 db.put([state, serialized_readers_entity], config=config)
1379 def _schedule_shards(cls,
1385 """Prepares shard states
and schedules their execution
.
1387 Even though this method does
not schedule shard task
and save shard state
1388 transactionally
, it
's safe for taskqueue to retry this logic because
1389 the initial shard_state for each shard is the same from any retry.
1390 This is an important yet reasonable assumption on model.ShardState.
1393 spec: mapreduce specification as MapreduceSpec.
1394 readers: list of InputReaders describing shard splits.
1395 queue_name: The queue to run this job on.
1396 base_path: The base url path of mapreduce callbacks.
1397 mr_state: The MapReduceState of current job.
1401 for shard_number, input_reader in enumerate(readers):
1402 shard_state = model.ShardState.create_new(spec.mapreduce_id, shard_number)
1403 shard_state.shard_description = str(input_reader)
1404 shard_states.append(shard_state)
1407 existing_shard_states = db.get(shard.key() for shard in shard_states)
1408 existing_shard_keys = set(shard.key() for shard in existing_shard_states
1409 if shard is not None)
1413 db.put((shard for shard in shard_states
1414 if shard.key() not in existing_shard_keys),
1415 config=util.create_datastore_write_config(spec))
1418 writer_class = spec.mapper.output_writer_class()
1419 writers = [None] * len(readers)
1421 for shard_number, shard_state in enumerate(shard_states):
1422 writers[shard_number] = writer_class.create(
1423 mr_state.mapreduce_spec,
1424 shard_state.shard_number, shard_state.retries + 1,
1425 mr_state.writer_state)
1430 for shard_number, (input_reader, output_writer) in enumerate(
1431 zip(readers, writers)):
1432 shard_id = model.ShardState.shard_id_from_number(
1433 spec.mapreduce_id, shard_number)
1434 task = MapperWorkerCallbackHandler._state_to_task(
1435 model.TransientShardState(
1436 base_path, spec, shard_id, 0, input_reader, input_reader,
1437 output_writer=output_writer,
1438 handler=spec.mapper.handler),
1439 shard_states[shard_number])
1440 MapperWorkerCallbackHandler._add_task(task,
1445 def _check_mr_state(cls, state, mr_id):
1446 """Check MapreduceState.
1449 state: an MapreduceState instance.
1450 mr_id: mapreduce id.
1453 True if state is valid. False if not and this task should be dropped.
1457 "Mapreduce State for job %s is missing. Dropping Task.",
1460 if not state.active:
1462 "Mapreduce %s is not active. Looks like spurious task "
1463 "execution. Dropping Task.", mr_id)
1468 class StartJobHandler(base_handler.PostJsonHandler):
1469 """Command handler starts a mapreduce job.
1471 This handler allows user to start a mr via a web form. It's _start_map
1472 method can also be used independently to start a mapreduce
.
1476 """Handles start request
."""
1478 mapreduce_name = self._get_required_param("name")
1479 mapper_input_reader_spec = self._get_required_param("mapper_input_reader")
1480 mapper_handler_spec = self._get_required_param("mapper_handler")
1481 mapper_output_writer_spec = self.request.get("mapper_output_writer")
1482 mapper_params = self._get_params(
1483 "mapper_params_validator", "mapper_params.")
1484 params = self._get_params(
1485 "params_validator", "params.")
1486 if "base_path" not in params:
1487 params["base_path"] = parameters.config.BASE_PATH
1490 mapper_params["processing_rate"] = int(mapper_params.get(
1491 "processing_rate") or parameters.config.PROCESSING_RATE_PER_SEC)
1492 queue_name = mapper_params["queue_name"] = util.get_queue_name(
1493 mapper_params.get("queue_name", None))
1496 mapper_spec = model.MapperSpec(
1497 mapper_handler_spec,
1498 mapper_input_reader_spec,
1500 int(mapper_params.get("shard_count", parameters.config.SHARD_COUNT)),
1501 output_writer_spec=mapper_output_writer_spec)
1503 mapreduce_id = type(self)._start_map(
1507 queue_name=queue_name,
1508 _app=mapper_params.get("_app"))
1509 self.json_response["mapreduce_id"] = mapreduce_id
1511 def _get_params(self, validator_parameter, name_prefix):
1512 """Retrieves additional user
-supplied params
for the job
and validates them
.
1515 validator_parameter
: name of the request parameter which supplies
1516 validator
for this parameter
set.
1517 name_prefix
: common prefix
for all parameter names
in the request
.
1520 Any exception raised by the
'params_validator' request parameter
if
1521 the params fail to validate
.
1523 params_validator = self.request.get(validator_parameter)
1526 for key in self.request.arguments():
1527 if key.startswith(name_prefix):
1528 values = self.request.get_all(key)
1529 adjusted_key = key[len(name_prefix):]
1530 if len(values) == 1:
1531 user_params[adjusted_key] = values[0]
1533 user_params[adjusted_key] = values
1535 if params_validator:
1536 resolved_validator = util.for_name(params_validator)
1537 resolved_validator(user_params)
1541 def _get_required_param(self, param_name):
1542 """Get a required request parameter
.
1545 param_name
: name of request parameter to fetch
.
1551 errors
.NotEnoughArgumentsError
: if parameter
is not specified
.
1553 value = self.request.get(param_name)
1555 raise errors.NotEnoughArgumentsError(param_name + " not specified")
1566 hooks_class_name=None,
1568 in_xg_transaction=False):
1571 """See control
.start_map
.
1573 Requirements
for this method
:
1574 1. The request that invokes this method can either be regular
or
1575 from taskqueue
. So taskqueue specific headers can
not be used
.
1576 2. Each invocation transactionally starts an isolated mapreduce job with
1577 a unique
id. MapreduceState should be immediately available after
1578 returning
. See control
.start_map
's doc on transactional.
1579 3. Method should be lightweight.
1582 mapper_input_reader_class = mapper_spec.input_reader_class()
1583 mapper_input_reader_class.validate(mapper_spec)
1586 mapper_output_writer_class = mapper_spec.output_writer_class()
1587 if mapper_output_writer_class:
1588 mapper_output_writer_class.validate(mapper_spec)
1591 mapreduce_id = model.MapreduceState.new_mapreduce_id()
1592 mapreduce_spec = model.MapreduceSpec(
1595 mapper_spec.to_json(),
1600 ctx = context.Context(mapreduce_spec, None)
1601 context.Context._set(ctx)
1606 context.Context._set(None)
1609 if in_xg_transaction:
1610 propagation = db.MANDATORY
1612 propagation = db.INDEPENDENT
1614 @db.transactional(propagation=propagation)
1616 cls._create_and_save_state(mapreduce_spec, _app)
1617 cls._add_kickoff_task(mapreduce_params["base_path"], mapreduce_spec, eta,
1618 countdown, queue_name)
1624 def _create_and_save_state(cls, mapreduce_spec, _app):
1625 """Save mapreduce state to datastore.
1627 Save state to datastore so that UI can see it immediately.
1630 mapreduce_spec: model.MapreduceSpec,
1631 _app: app id if specified. None otherwise.
1634 The saved Mapreduce state.
1636 state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
1637 state.mapreduce_spec = mapreduce_spec
1639 state.active_shards = 0
1642 config = util.create_datastore_write_config(mapreduce_spec)
1643 state.put(config=config)
1647 def _add_kickoff_task(cls,
1653 params = {"mapreduce_id": mapreduce_spec.mapreduce_id}
1655 kickoff_task = taskqueue.Task(
1656 url=base_path + "/kickoffjob_callback/" + mapreduce_spec.mapreduce_id,
1657 headers=util._get_task_headers(mapreduce_spec),
1660 countdown=countdown)
1661 hooks = mapreduce_spec.get_hooks()
1662 if hooks is not None:
1664 hooks.enqueue_kickoff_task(kickoff_task, queue_name)
1666 except NotImplementedError:
1668 kickoff_task.add(queue_name, transactional=True)
1671 class FinalizeJobHandler(base_handler.TaskQueueHandler):
1672 """Finalize map job by deleting all temporary entities."""
1675 mapreduce_id = self.request.get("mapreduce_id")
1676 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1678 config=util.create_datastore_write_config(mapreduce_state.mapreduce_spec)
1679 keys = [model.MapreduceControl.get_key_by_job_id(mapreduce_id)]
1680 for ss in model.ShardState.find_all_by_mapreduce_state(mapreduce_state):
1682 model._HugeTaskPayload.all().ancestor(ss).run(keys_only=True)))
1683 keys.extend(list(model._HugeTaskPayload.all().ancestor(
1684 mapreduce_state).run(keys_only=True)))
1685 db.delete(keys, config=config)
1688 def schedule(cls, mapreduce_spec):
1689 """Schedule finalize task.
1692 mapreduce_spec: mapreduce specification as MapreduceSpec.
1694 task_name = mapreduce_spec.mapreduce_id + "-finalize"
1695 finalize_task = taskqueue.Task(
1697 url=(mapreduce_spec.params["base_path"] + "/finalizejob_callback/" +
1698 mapreduce_spec.mapreduce_id),
1699 params={"mapreduce_id": mapreduce_spec.mapreduce_id},
1700 headers=util._get_task_headers(mapreduce_spec))
1701 queue_name = util.get_queue_name(None)
1702 if not _run_task_hook(mapreduce_spec.get_hooks(),
1703 "enqueue_controller_task",
1707 finalize_task.add(queue_name)
1708 except (taskqueue.TombstonedTaskError,
1709 taskqueue.TaskAlreadyExistsError), e:
1710 logging.warning("Task %r already exists. %s: %s",
1711 task_name, e.__class__, e)
1714 class CleanUpJobHandler(base_handler.PostJsonHandler):
1715 """Command to kick off tasks to clean up a job's data
."""
1718 mapreduce_id = self.request.get("mapreduce_id")
1720 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1722 shard_keys = model.ShardState.calculate_keys_by_mapreduce_state(
1724 db.delete(shard_keys)
1725 db.delete(mapreduce_state)
1726 self.json_response["status"] = ("Job %s successfully cleaned up." %
1730 class AbortJobHandler(base_handler.PostJsonHandler):
1731 """Command to abort a running job
."""
1734 model.MapreduceControl.abort(self.request.get("mapreduce_id"))
1735 self.json_response["status"] = "Abort signal sent."