3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
32 """Defines executor tasks handlers for MapReduce implementation."""
49 from google
.appengine
.ext
import ndb
51 from google
.appengine
import runtime
52 from google
.appengine
.api
import datastore_errors
53 from google
.appengine
.api
import logservice
54 from google
.appengine
.api
import modules
55 from google
.appengine
.api
import taskqueue
56 from google
.appengine
.ext
import db
57 from google
.appengine
.ext
.mapreduce
import base_handler
58 from google
.appengine
.ext
.mapreduce
import context
59 from google
.appengine
.ext
.mapreduce
import errors
60 from google
.appengine
.ext
.mapreduce
import input_readers
61 from google
.appengine
.ext
.mapreduce
import map_job_context
62 from google
.appengine
.ext
.mapreduce
import model
63 from google
.appengine
.ext
.mapreduce
import operation
64 from google
.appengine
.ext
.mapreduce
import output_writers
65 from google
.appengine
.ext
.mapreduce
import parameters
66 from google
.appengine
.ext
.mapreduce
import shard_life_cycle
67 from google
.appengine
.ext
.mapreduce
import util
68 from google
.appengine
.ext
.mapreduce
.api
import map_job
69 from google
.appengine
.runtime
import apiproxy_errors
73 from google
.appengine
.ext
import cloudstorage
78 if hasattr(cloudstorage
, "_STUB"):
93 _TEST_INJECTED_FAULTS
= set()
96 def _run_task_hook(hooks
, method
, task
, queue_name
):
97 """Invokes hooks.method(task, queue_name).
100 hooks: A hooks.Hooks instance or None.
101 method: The name of the method to invoke on the hooks class e.g.
102 "enqueue_kickoff_task".
103 task: The taskqueue.Task to pass to the hook method.
104 queue_name: The name of the queue to pass to the hook method.
107 True if the hooks.Hooks instance handled the method, False otherwise.
109 if hooks
is not None:
111 getattr(hooks
, method
)(task
, queue_name
)
112 except NotImplementedError:
120 class MapperWorkerCallbackHandler(base_handler
.HugeTaskHandler
):
121 """Callback handler for mapreduce worker task."""
124 _TASK_DIRECTIVE
= util
._enum
(
126 PROCEED_TASK
="proceed_task",
129 RETRY_TASK
="retry_task",
132 RETRY_SLICE
="retry_slice",
134 DROP_TASK
="drop_task",
136 RECOVER_SLICE
="recover_slice",
138 RETRY_SHARD
="retry_shard",
140 FAIL_TASK
="fail_task",
142 ABORT_SHARD
="abort_shard")
144 def __init__(self
, *args
):
146 super(MapperWorkerCallbackHandler
, self
).__init
__(*args
)
147 self
._time
= time
.time
148 self
.slice_context
= None
149 self
.shard_context
= None
151 def _drop_gracefully(self
):
152 """Drop worker task gracefully.
154 Set current shard_state to failed. Controller logic will take care of
155 other shards and the entire MR.
157 shard_id
= self
.request
.headers
[util
._MR
_SHARD
_ID
_TASK
_HEADER
]
158 mr_id
= self
.request
.headers
[util
._MR
_ID
_TASK
_HEADER
]
159 shard_state
, mr_state
= db
.get([
160 model
.ShardState
.get_key_by_shard_id(shard_id
),
161 model
.MapreduceState
.get_key_by_job_id(mr_id
)])
163 if shard_state
and shard_state
.active
:
164 shard_state
.set_for_failure()
165 config
= util
.create_datastore_write_config(mr_state
.mapreduce_spec
)
166 shard_state
.put(config
=config
)
168 def _try_acquire_lease(self
, shard_state
, tstate
):
169 """Validate datastore and the task payload are consistent.
171 If so, attempt to get a lease on this slice's execution.
172 See model.ShardState doc on slice_start_time.
175 shard_state: model.ShardState from datastore.
176 tstate: model.TransientShardState from taskqueue paylod.
179 A _TASK_DIRECTIVE enum. PROCEED_TASK if lock is acquired.
180 RETRY_TASK if task should be retried, DROP_TASK if task should
181 be dropped. Only old tasks (comparing to datastore state)
182 will be dropped. Future tasks are retried until they naturally
183 become old so that we don't ever stuck MR.
187 logging
.warning("State not found for shard %s; Possible spurious task "
188 "execution. Dropping this task.",
190 return self
._TASK
_DIRECTIVE
.DROP_TASK
192 if not shard_state
.active
:
193 logging
.warning("Shard %s is not active. Possible spurious task "
194 "execution. Dropping this task.", tstate
.shard_id
)
195 logging
.warning(str(shard_state
))
196 return self
._TASK
_DIRECTIVE
.DROP_TASK
199 if shard_state
.retries
> tstate
.retries
:
201 "Got shard %s from previous shard retry %s. Possible spurious "
202 "task execution. Dropping this task.",
205 logging
.warning(str(shard_state
))
206 return self
._TASK
_DIRECTIVE
.DROP_TASK
207 elif shard_state
.retries
< tstate
.retries
:
212 "ShardState for %s is behind slice. Waiting for it to catch up",
213 shard_state
.shard_id
)
214 return self
._TASK
_DIRECTIVE
.RETRY_TASK
218 if shard_state
.slice_id
> tstate
.slice_id
:
220 "Task %s-%s is behind ShardState %s. Dropping task.""",
221 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
222 return self._TASK_DIRECTIVE.DROP_TASK
226 elif shard_state.slice_id < tstate.slice_id:
228 "Task
%s-%s is ahead of ShardState
%s. Waiting
for it to catch up
.",
229 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
230 return self._TASK_DIRECTIVE.RETRY_TASK
234 if shard_state.slice_start_time:
235 countdown = self._wait_time(shard_state,
236 parameters._LEASE_DURATION_SEC)
239 "Last retry of
slice %s-%s may be still running
."
240 "Will
try again
in %s seconds
", tstate.shard_id, tstate.slice_id,
245 time.sleep(countdown)
246 return self._TASK_DIRECTIVE.RETRY_TASK
249 if self._wait_time(shard_state,
250 parameters._MAX_LEASE_DURATION_SEC):
251 if not self._has_old_request_ended(shard_state):
253 "Last retry of
slice %s-%s is still
in flight with request_id
"
254 "%s. Will
try again later
.", tstate.shard_id, tstate.slice_id,
255 shard_state.slice_request_id)
256 return self._TASK_DIRECTIVE.RETRY_TASK
259 "Last retry of
slice %s-%s has no log entry
and has
"
260 "timed out after
%s seconds
",
261 tstate.shard_id, tstate.slice_id,
262 parameters._MAX_LEASE_DURATION_SEC)
265 config = util.create_datastore_write_config(tstate.mapreduce_spec)
266 @db.transactional(retries=5)
268 """Use datastore to set slice_start_time to now.
270 If failed for any reason, raise error to retry the task (hence all
271 the previous validation code). The task would die naturally eventually.
274 Rollback: If the shard state is missing.
277 A _TASK_DIRECTIVE enum.
279 fresh_state = model.ShardState.get_by_shard_id(tstate.shard_id)
281 logging.warning("ShardState missing
.")
283 if (fresh_state.active and
284 fresh_state.slice_id == shard_state.slice_id and
285 fresh_state.slice_start_time == shard_state.slice_start_time):
286 shard_state.slice_start_time = datetime.datetime.now()
287 shard_state.slice_request_id = os.environ.get("REQUEST_LOG_ID
")
288 shard_state.acquired_once = True
289 shard_state.put(config=config)
290 return self._TASK_DIRECTIVE.PROCEED_TASK
293 "Contention on
slice %s-%s execution
. Will retry again
.",
294 tstate.shard_id, tstate.slice_id)
296 time.sleep(random.randrange(1, 5))
297 return self._TASK_DIRECTIVE.RETRY_TASK
301 def _has_old_request_ended(self, shard_state):
302 """Whether previous slice retry has ended according to Logs API.
305 shard_state: shard state.
308 True if the request of previous slice retry has ended. False if it has
311 assert shard_state.slice_start_time is not None
312 assert shard_state.slice_request_id is not None
313 request_ids = [shard_state.slice_request_id]
314 logs = list(logservice.fetch(
315 request_ids=request_ids,
317 module_versions=[(os.environ["CURRENT_MODULE_ID
"],
318 modules.get_current_version_name())]))
320 if not logs or not logs[0].finished:
324 def _wait_time(self, shard_state, secs, now=datetime.datetime.now):
325 """Time to wait until slice_start_time is secs ago from now.
328 shard_state: shard state.
329 secs: duration in seconds.
330 now: a func that gets now.
333 0 if no wait. A positive int in seconds otherwise. Always around up.
335 assert shard_state.slice_start_time is not None
336 delta = now() - shard_state.slice_start_time
337 duration = datetime.timedelta(seconds=secs)
339 return util.total_seconds(duration - delta)
343 def _try_free_lease(self, shard_state, slice_retry=False):
344 """Try to free lease.
346 A lightweight transaction to update shard_state and unset
347 slice_start_time to allow the next retry to happen without blocking.
348 We don't care if this fails or not because the lease will expire
351 Under normal execution, _save_state_and_schedule_next is the exit point.
352 It updates/saves shard state and schedules the next slice or returns.
353 Other exit points are:
354 1. _are_states_consistent: at the beginning of handle, checks
355 if datastore states and the task are in sync.
356 If not, raise or return.
357 2. _attempt_slice_retry: may raise exception to taskqueue.
358 3. _save_state_and_schedule_next: may raise exception when taskqueue/db
361 This handler should try to free the lease on every exceptional exit point.
364 shard_state: model.ShardState.
365 slice_retry: whether to count this as a failed slice execution.
369 fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
370 if fresh_state and fresh_state.active:
372 fresh_state.slice_start_time = None
373 fresh_state.slice_request_id = None
375 fresh_state.slice_retries += 1
383 "Release lock
for shard
%s failed
. Wait
for lease to expire
.",
384 shard_state.shard_id)
386 def _maintain_LC(self, obj, slice_id, last_slice=False, begin_slice=True,
387 shard_ctx=None, slice_ctx=None):
388 """Makes sure shard life cycle interface are respected.
391 obj: the obj that may have implemented _ShardLifeCycle.
392 slice_id: current slice_id
393 last_slice: whether this is the last slice.
394 begin_slice: whether this is the beginning or the end of a slice.
395 shard_ctx: shard ctx for dependency injection. If None, it will be read
397 slice_ctx: slice ctx for dependency injection. If None, it will be read
400 if obj is None or not isinstance(obj, shard_life_cycle._ShardLifeCycle):
403 shard_context = shard_ctx or self.shard_context
404 slice_context = slice_ctx or self.slice_context
407 obj.begin_shard(shard_context)
408 obj.begin_slice(slice_context)
410 obj.end_slice(slice_context)
412 obj.end_shard(shard_context)
414 def _lc_start_slice(self, tstate, slice_id):
415 self._maintain_LC(tstate.output_writer, slice_id)
416 self._maintain_LC(tstate.input_reader, slice_id)
417 self._maintain_LC(tstate.handler, slice_id)
419 def _lc_end_slice(self, tstate, slice_id, last_slice=False):
420 self._maintain_LC(tstate.handler, slice_id, last_slice=last_slice,
422 self._maintain_LC(tstate.input_reader, slice_id, last_slice=last_slice,
424 self._maintain_LC(tstate.output_writer, slice_id, last_slice=last_slice,
430 This method has to be careful to pass the same ShardState instance to
431 its subroutines calls if the calls mutate or read from ShardState.
432 Note especially that Context instance caches and updates the ShardState
436 Set HTTP status code and always returns None.
439 self._start_time = self._time()
440 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
441 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
442 spec = model.MapreduceSpec._get_mapreduce_spec(mr_id)
443 shard_state, control = db.get([
444 model.ShardState.get_key_by_shard_id(shard_id),
445 model.MapreduceControl.get_key_by_job_id(mr_id),
449 ctx = context.Context(spec, shard_state,
450 task_retry_count=self.task_retry_count())
451 context.Context._set(ctx)
454 tstate = model.TransientShardState.from_request(self.request)
458 is_this_a_retry = shard_state.acquired_once
459 task_directive = self._try_acquire_lease(shard_state, tstate)
460 if task_directive in (self._TASK_DIRECTIVE.RETRY_TASK,
461 self._TASK_DIRECTIVE.DROP_TASK):
462 return self.__return(shard_state, tstate, task_directive)
463 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
466 if control and control.command == model.MapreduceControl.ABORT:
467 task_directive = self._TASK_DIRECTIVE.ABORT_SHARD
468 return self.__return(shard_state, tstate, task_directive)
471 if (is_this_a_retry and
472 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS <= 1):
473 task_directive = self._TASK_DIRECTIVE.RETRY_SHARD
474 return self.__return(shard_state, tstate, task_directive)
478 util._set_ndb_cache_policy()
480 job_config = map_job.JobConfig._to_map_job_config(
482 os.environ.get("HTTP_X_APPENGINE_QUEUENAME
"))
483 job_context = map_job_context.JobContext(job_config)
484 self.shard_context = map_job_context.ShardContext(job_context, shard_state)
485 self.slice_context = map_job_context.SliceContext(self.shard_context,
489 slice_id = tstate.slice_id
490 self._lc_start_slice(tstate, slice_id)
492 if shard_state.is_input_finished():
493 self._lc_end_slice(tstate, slice_id, last_slice=True)
495 if (tstate.output_writer and
496 isinstance(tstate.output_writer, output_writers.OutputWriter)):
501 tstate.output_writer.finalize(ctx, shard_state)
502 shard_state.set_for_success()
503 return self.__return(shard_state, tstate, task_directive)
506 task_directive = self._attempt_slice_recovery(shard_state, tstate)
507 if task_directive != self._TASK_DIRECTIVE.PROCEED_TASK:
508 return self.__return(shard_state, tstate, task_directive)
510 last_slice = self._process_inputs(
511 tstate.input_reader, shard_state, tstate, ctx)
513 self._lc_end_slice(tstate, slice_id)
521 shard_state.set_input_finished()
524 logging.warning("Shard
%s got error
.", shard_state.shard_id)
525 logging.error(traceback.format_exc())
528 if type(e) is errors.FailJobError:
529 logging.error("Got FailJobError
.")
530 task_directive = self._TASK_DIRECTIVE.FAIL_TASK
532 task_directive = self._TASK_DIRECTIVE.RETRY_SLICE
534 self.__return(shard_state, tstate, task_directive)
536 def __return(self, shard_state, tstate, task_directive):
537 """Handler should always call this as the last statement."""
538 task_directive = self._set_state(shard_state, tstate, task_directive)
539 self._save_state_and_schedule_next(shard_state, tstate, task_directive)
541 def _process_inputs(self,
546 """Read inputs, process them, and write out outputs.
548 This is the core logic of MapReduce. It reads inputs from input reader,
549 invokes user specified mapper function, and writes output with
550 output writer. It also updates shard_state accordingly.
551 e.g. if shard processing is done, set shard_state.active to False.
553 If errors.FailJobError is caught, it will fail this MR job.
554 All other exceptions will be logged and raised to taskqueue for retry
555 until the number of retries exceeds a limit.
558 input_reader: input reader.
559 shard_state: shard state.
560 tstate: transient shard state.
561 ctx: mapreduce context.
564 Whether this shard has finished processing all its input split.
566 processing_limit = self._processing_limit(tstate.mapreduce_spec)
567 if processing_limit == 0:
570 finished_shard = True
572 iterator = iter(input_reader)
576 entity = iterator.next()
577 except StopIteration:
590 if isinstance(entity, db.Model):
591 shard_state.last_work_item = repr(entity.key())
592 elif isinstance(entity, ndb.Model):
593 shard_state.last_work_item = repr(entity.key)
595 shard_state.last_work_item = repr(entity)[:100]
597 processing_limit -= 1
599 if not self._process_datum(
600 entity, input_reader, ctx, tstate):
601 finished_shard = False
603 elif processing_limit == 0:
604 finished_shard = False
608 self.slice_context.incr(
609 context.COUNTER_MAPPER_WALLTIME_MS,
610 int((self._time() - self._start_time)*1000))
612 return finished_shard
614 def _process_datum(self, data, input_reader, ctx, transient_shard_state):
615 """Process a single data piece.
617 Call mapper handler on the data.
620 data: a datum to process.
621 input_reader: input reader.
622 ctx: mapreduce context
623 transient_shard_state: transient shard state.
626 True if scan should be continued, False if scan should be stopped.
628 if data is not input_readers.ALLOW_CHECKPOINT:
629 self.slice_context.incr(context.COUNTER_MAPPER_CALLS)
631 handler = transient_shard_state.handler
633 if isinstance(handler, map_job.Mapper):
634 handler(self.slice_context, data)
636 if input_reader.expand_parameters:
637 result = handler(*data)
639 result = handler(data)
641 if util.is_generator(result):
642 for output in result:
643 if isinstance(output, operation.Operation):
646 output_writer = transient_shard_state.output_writer
647 if not output_writer:
649 "Handler yielded
%s, but no output writer
is set.", output)
651 output_writer.write(output)
653 if self._time() - self._start_time >= parameters.config._SLICE_DURATION_SEC:
657 def _set_state(self, shard_state, tstate, task_directive):
658 """Set shard_state and tstate based on task_directive.
661 shard_state: model.ShardState for current shard.
662 tstate: model.TransientShardState for current shard.
663 task_directive: self._TASK_DIRECTIVE for current shard.
666 A _TASK_DIRECTIVE enum.
667 PROCEED_TASK if task should proceed normally.
668 RETRY_SHARD if shard should be retried.
669 RETRY_SLICE if slice should be retried.
670 FAIL_TASK if sahrd should fail.
671 RECOVER_SLICE if slice should be recovered.
672 ABORT_SHARD if shard should be aborted.
673 RETRY_TASK if task should be retried.
674 DROP_TASK if task should be dropped.
676 if task_directive in (self._TASK_DIRECTIVE.RETRY_TASK,
677 self._TASK_DIRECTIVE.DROP_TASK):
678 return task_directive
680 if task_directive == self._TASK_DIRECTIVE.ABORT_SHARD:
681 shard_state.set_for_abort()
682 return task_directive
684 if task_directive == self._TASK_DIRECTIVE.PROCEED_TASK:
685 shard_state.advance_for_next_slice()
686 tstate.advance_for_next_slice()
687 return task_directive
689 if task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
690 tstate.advance_for_next_slice(recovery_slice=True)
691 shard_state.advance_for_next_slice(recovery_slice=True)
692 return task_directive
694 if task_directive == self._TASK_DIRECTIVE.RETRY_SLICE:
695 task_directive = self._attempt_slice_retry(shard_state, tstate)
696 if task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
697 task_directive = self._attempt_shard_retry(shard_state, tstate)
698 if task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
699 shard_state.set_for_failure()
701 return task_directive
703 def _save_state_and_schedule_next(self, shard_state, tstate, task_directive):
704 """Save state and schedule task.
706 Save shard state to datastore.
707 Schedule next slice if needed.
708 Set HTTP response code.
709 No modification to any shard_state or tstate.
712 shard_state: model.ShardState for current shard.
713 tstate: model.TransientShardState for current shard.
714 task_directive: enum _TASK_DIRECTIVE.
717 The task to retry if applicable.
719 spec = tstate.mapreduce_spec
721 if task_directive == self._TASK_DIRECTIVE.DROP_TASK:
723 if task_directive in (self._TASK_DIRECTIVE.RETRY_SLICE,
724 self._TASK_DIRECTIVE.RETRY_TASK):
726 return self.retry_task()
727 elif task_directive == self._TASK_DIRECTIVE.ABORT_SHARD:
728 logging.info("Aborting shard
%d of job
'%s'",
729 shard_state.shard_number, shard_state.mapreduce_id)
731 elif task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
732 logging.critical("Shard
%s failed permanently
.", shard_state.shard_id)
734 elif task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
735 logging.warning("Shard
%s is going to be attempted
for the
%s time
.",
736 shard_state.shard_id,
737 shard_state.retries + 1)
738 task = self._state_to_task(tstate, shard_state)
739 elif task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
740 logging.warning("Shard
%s slice %s is being recovered
.",
741 shard_state.shard_id,
742 shard_state.slice_id)
743 task = self._state_to_task(tstate, shard_state)
745 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
746 countdown = self._get_countdown_for_next_slice(spec)
747 task = self._state_to_task(tstate, shard_state, countdown=countdown)
750 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME
",
754 config = util.create_datastore_write_config(spec)
756 @db.transactional(retries=5)
758 """The Transaction helper."""
759 fresh_shard_state = model.ShardState.get_by_shard_id(tstate.shard_id)
760 if not fresh_shard_state:
762 if (not fresh_shard_state.active or
763 "worker_active_state_collision
" in _TEST_INJECTED_FAULTS):
764 logging.warning("Shard
%s is not active
. Possible spurious task
"
765 "execution
. Dropping this task
.", tstate.shard_id)
766 logging.warning("Datastore
's %s", str(fresh_shard_state))
767 logging.warning("Slice's
%s", str(shard_state))
769 fresh_shard_state.copy_from(shard_state)
770 fresh_shard_state.put(config=config)
775 if fresh_shard_state.active:
778 self._add_task(task, spec, queue_name)
782 except (datastore_errors.Error,
784 runtime.DeadlineExceededError,
785 apiproxy_errors.Error), e:
787 "Can
't transactionally continue shard. "
788 "Will retry slice %s %s for the %s time.",
791 self.task_retry_count() + 1)
792 self._try_free_lease(shard_state)
795 def _attempt_slice_recovery(self, shard_state, tstate):
798 This is run when a slice had been previously attempted and output
799 may have been written. If an output writer requires slice recovery,
800 we run those logic to remove output duplicates. Otherwise we just retry
803 If recovery is needed, then the entire slice will be dedicated
804 to recovery logic. No data processing will take place. Thus we call
805 the slice "recovery slice". This is needed for correctness:
806 An output writer instance can be out of sync from its physical
807 medium only when the slice dies after acquring the shard lock but before
808 committing shard state to db. The worst failure case is when
809 shard state failed to commit after the NAMED task for the next slice was
810 added. Thus, recovery slice has a special logic to increment current
811 slice_id n to n+2. If the task for n+1 had been added, it will be dropped
812 because it is behind shard state.
815 shard_state: an instance of Model.ShardState.
816 tstate: an instance of Model.TransientShardState.
819 _TASK_DIRECTIVE.PROCEED_TASK to continue with this retry.
820 _TASK_DIRECTIVE.RECOVER_SLICE to recover this slice.
821 The next slice will start at the same input as
822 this slice but output to a new instance of output writer.
823 Combining outputs from all writer instances is up to implementation.
825 mapper_spec = tstate.mapreduce_spec.mapper
826 if not (tstate.output_writer and
827 tstate.output_writer._supports_slice_recovery(mapper_spec)):
828 return self._TASK_DIRECTIVE.PROCEED_TASK
830 tstate.output_writer = tstate.output_writer._recover(
831 tstate.mapreduce_spec, shard_state.shard_number,
832 shard_state.retries + 1)
833 return self._TASK_DIRECTIVE.RECOVER_SLICE
835 def _attempt_shard_retry(self, shard_state, tstate):
836 """Whether to retry shard.
838 This method may modify shard_state and tstate to prepare for retry or fail.
841 shard_state: model.ShardState for current shard.
842 tstate: model.TransientShardState for current shard.
845 A _TASK_DIRECTIVE enum. RETRY_SHARD if shard should be retried.
848 shard_attempts = shard_state.retries + 1
850 if shard_attempts >= parameters.config.SHARD_MAX_ATTEMPTS:
852 "Shard attempt %s exceeded %s max attempts.",
853 shard_attempts, parameters.config.SHARD_MAX_ATTEMPTS)
854 return self._TASK_DIRECTIVE.FAIL_TASK
855 if tstate.output_writer and (
856 not tstate.output_writer._supports_shard_retry(tstate)):
857 logging.warning("Output writer %s does not support shard retry.",
858 tstate.output_writer.__class__.__name__)
859 return self._TASK_DIRECTIVE.FAIL_TASK
861 shard_state.reset_for_retry()
862 logging.warning("Shard %s attempt %s failed with up to %s attempts.",
863 shard_state.shard_id,
865 parameters.config.SHARD_MAX_ATTEMPTS)
867 if tstate.output_writer:
868 output_writer = tstate.output_writer.create(
869 tstate.mapreduce_spec, shard_state.shard_number, shard_attempts + 1)
870 tstate.reset_for_retry(output_writer)
871 return self._TASK_DIRECTIVE.RETRY_SHARD
873 def _attempt_slice_retry(self, shard_state, tstate):
874 """Attempt to retry this slice.
876 This method may modify shard_state and tstate to prepare for retry or fail.
879 shard_state: model.ShardState for current shard.
880 tstate: model.TransientShardState for current shard.
883 A _TASK_DIRECTIVE enum. RETRY_SLICE if slice should be retried.
884 RETRY_SHARD if shard retry should be attempted.
886 if (shard_state.slice_retries + 1 <
887 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS):
889 "Slice %s %s failed for the %s of up to %s attempts "
890 "(%s of %s taskqueue execution attempts). "
894 shard_state.slice_retries + 1,
895 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS,
896 self.task_retry_count() + 1,
897 parameters.config.TASK_MAX_ATTEMPTS)
902 self._try_free_lease(shard_state, slice_retry=True)
903 return self._TASK_DIRECTIVE.RETRY_SLICE
905 if parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS > 0:
906 logging.warning("Slice attempt %s exceeded %s max attempts.",
907 self.task_retry_count() + 1,
908 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)
909 return self._TASK_DIRECTIVE.RETRY_SHARD
912 def get_task_name(shard_id, slice_id, retry=0):
913 """Compute single worker task name.
918 retry: current shard retry count.
921 task name which should be used to process specified shard/slice.
925 return "appengine-mrshard-%s-%s-retry-%s" % (
926 shard_id, slice_id, retry)
928 def _get_countdown_for_next_slice(self, spec):
929 """Get countdown for next slice's task
.
931 When user sets processing rate
, we
set countdown to delay task execution
.
934 spec
: model
.MapreduceSpec
940 if self._processing_limit(spec) != -1:
942 int(parameters.config._SLICE_DURATION_SEC -
943 (self._time() - self._start_time)), 0)
947 def _state_to_task(cls,
952 """Generate task
for slice according to current states
.
955 tstate
: An instance of TransientShardState
.
956 shard_state
: An instance of ShardState
.
957 eta
: Absolute time when the MR should execute
. May
not be specified
958 if 'countdown' is also supplied
. This may be timezone
-aware
or
960 countdown
: Time
in seconds into the future that this MR should execute
.
964 A model
.HugeTask instance
for the
slice specified by current states
.
966 base_path = tstate.base_path
968 task_name = MapperWorkerCallbackHandler.get_task_name(
973 headers = util._get_task_headers(tstate.mapreduce_spec.mapreduce_id)
974 headers[util._MR_SHARD_ID_TASK_HEADER] = tstate.shard_id
976 worker_task = model.HugeTask(
977 url=base_path + "/worker_callback/" + tstate.shard_id,
978 params=tstate.to_dict(),
991 """Schedule
slice scanning by adding it to the task queue
.
994 worker_task
: a model
.HugeTask task
for slice. This
is NOT a taskqueue
996 mapreduce_spec
: an instance of model
.MapreduceSpec
.
997 queue_name
: Optional queue to run on
; uses the current queue of
998 execution
or the default queue
if unspecified
.
1000 if not _run_task_hook(mapreduce_spec.get_hooks(),
1001 "enqueue_worker_task",
1007 worker_task.add(queue_name)
1008 except (taskqueue.TombstonedTaskError,
1009 taskqueue.TaskAlreadyExistsError), e:
1010 logging.warning("Task %r already exists. %s: %s",
1015 def _processing_limit(self, spec):
1016 """Get the limit on the number of
map calls allowed by this
slice.
1019 spec
: a Mapreduce spec
.
1022 The limit
as a positive
int if specified by user
. -1 otherwise
.
1024 processing_rate = float(spec.mapper.params.get("processing_rate", 0))
1025 slice_processing_limit = -1
1026 if processing_rate > 0:
1027 slice_processing_limit = int(math.ceil(
1028 parameters.config._SLICE_DURATION_SEC*processing_rate/
1029 int(spec.mapper.shard_count)))
1030 return slice_processing_limit
1035 def _schedule_slice(cls,
1041 """Schedule
slice scanning by adding it to the task queue
.
1044 shard_state
: An instance of ShardState
.
1045 tstate
: An instance of TransientShardState
.
1046 queue_name
: Optional queue to run on
; uses the current queue of
1047 execution
or the default queue
if unspecified
.
1048 eta
: Absolute time when the MR should execute
. May
not be specified
1049 if 'countdown' is also supplied
. This may be timezone
-aware
or
1051 countdown
: Time
in seconds into the future that this MR should execute
.
1054 queue_name = queue_name or os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
1056 task = cls._state_to_task(tstate, shard_state, eta, countdown)
1057 cls._add_task(task, tstate.mapreduce_spec, queue_name)
1060 class ControllerCallbackHandler(base_handler.HugeTaskHandler):
1061 """Supervises mapreduce execution
.
1063 Is also responsible
for gathering execution status
from shards together
.
1065 This task
is "continuously" running by adding itself again to taskqueue
if
1066 and only
if mapreduce
is still active
. A mapreduce
is active
if it has
1067 actively running shards
.
1070 def __init__(self, *args):
1072 super(ControllerCallbackHandler, self).__init__(*args)
1073 self._time = time.time
1075 def _drop_gracefully(self):
1076 """Gracefully drop controller task
.
1078 This method
is called when decoding controller task payload failed
.
1079 Upon this we mark ShardState
and MapreduceState
as failed so all
1082 Writing to datastore
is forced (ignore read
-only mode
) because we
1083 want the tasks to stop badly
, and if force_writes was
False,
1084 the job would have never been started
.
1086 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
1087 state = model.MapreduceState.get_by_job_id(mr_id)
1088 if not state or not state.active:
1091 state.active = False
1092 state.result_status = model.MapreduceState.RESULT_FAILED
1093 config = util.create_datastore_write_config(state.mapreduce_spec)
1095 for ss in model.ShardState.find_all_by_mapreduce_state(state):
1097 ss.set_for_failure()
1100 if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
1101 db.put(puts, config=config)
1103 db.put(puts, config=config)
1105 db.put(state, config=config)
1108 """Handle request
."""
1109 spec = model.MapreduceSpec.from_json_str(
1110 self.request.get("mapreduce_spec"))
1111 state, control = db.get([
1112 model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
1113 model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
1117 logging.warning("State not found for MR '%s'; dropping controller task.",
1120 if not state.active:
1122 "MR %r is not active. Looks like spurious controller task execution.",
1124 self._clean_up_mr(spec)
1127 shard_states = model.ShardState.find_all_by_mapreduce_state(state)
1128 self._update_state_from_shard_states(state, shard_states, control)
1131 ControllerCallbackHandler.reschedule(
1132 state, spec, self.serial_id() + 1)
1134 def _update_state_from_shard_states(self, state, shard_states, control):
1135 """Update mr state by examing shard states
.
1138 state
: current mapreduce state
as MapreduceState
.
1139 shard_states
: an iterator over shard states
.
1140 control
: model
.MapreduceControl entity
.
1143 state.active_shards, state.aborted_shards, state.failed_shards = 0, 0, 0
1145 processed_counts = []
1146 state.counters_map.clear()
1149 for s in shard_states:
1152 state.active_shards += 1
1153 if s.result_status == model.ShardState.RESULT_ABORTED:
1154 state.aborted_shards += 1
1155 elif s.result_status == model.ShardState.RESULT_FAILED:
1156 state.failed_shards += 1
1159 state.counters_map.add_map(s.counters_map)
1160 processed_counts.append(s.counters_map.get(context.COUNTER_MAPPER_CALLS))
1162 state.set_processed_counts(processed_counts)
1163 state.last_poll_time = datetime.datetime.utcfromtimestamp(self._time())
1165 spec = state.mapreduce_spec
1167 if total_shards != spec.mapper.shard_count:
1168 logging.error("Found %d shard states. Expect %d. "
1169 "Issuing abort command to job '%s'",
1170 total_shards, spec.mapper.shard_count,
1173 model.MapreduceControl.abort(spec.mapreduce_id)
1177 state.active = bool(state.active_shards)
1178 if not control and (state.failed_shards or state.aborted_shards):
1180 model.MapreduceControl.abort(spec.mapreduce_id)
1182 if not state.active:
1184 if state.failed_shards or not total_shards:
1185 state.result_status = model.MapreduceState.RESULT_FAILED
1188 elif state.aborted_shards:
1189 state.result_status = model.MapreduceState.RESULT_ABORTED
1191 state.result_status = model.MapreduceState.RESULT_SUCCESS
1192 self._finalize_outputs(spec, state)
1193 self._finalize_job(spec, state)
1195 @db.transactional(retries=5)
1197 """The helper
for storing the state
."""
1198 fresh_state = model.MapreduceState.get_by_job_id(spec.mapreduce_id)
1201 if not fresh_state.active:
1203 "Job %s is not active. Looks like spurious task execution. "
1204 "Dropping controller task.", spec.mapreduce_id)
1206 config = util.create_datastore_write_config(spec)
1207 state.put(config=config)
1211 def serial_id(self):
1212 """Get serial unique identifier of this task
from request
.
1215 serial identifier
as int.
1217 return int(self.request.get("serial_id"))
1220 def _finalize_outputs(cls, mapreduce_spec, mapreduce_state):
1221 """Finalize outputs
.
1224 mapreduce_spec
: an instance of MapreduceSpec
.
1225 mapreduce_state
: an instance of MapreduceState
.
1228 if (mapreduce_spec.mapper.output_writer_class() and
1229 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
1230 mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
1233 def _finalize_job(cls, mapreduce_spec, mapreduce_state):
1234 """Finalize job execution
.
1236 Invokes done callback
and save mapreduce state
in a transaction
,
1237 and schedule necessary clean ups
. This method
is idempotent
.
1240 mapreduce_spec
: an instance of MapreduceSpec
1241 mapreduce_state
: an instance of MapreduceState
1243 config = util.create_datastore_write_config(mapreduce_spec)
1244 queue_name = util.get_queue_name(mapreduce_spec.params.get(
1245 model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
1246 done_callback = mapreduce_spec.params.get(
1247 model.MapreduceSpec.PARAM_DONE_CALLBACK)
1250 done_task = taskqueue.Task(
1252 headers=util._get_task_headers(mapreduce_spec.mapreduce_id,
1253 util.CALLBACK_MR_ID_TASK_HEADER),
1254 method=mapreduce_spec.params.get("done_callback_method", "POST"))
1256 @db.transactional(retries=5)
1258 """Helper to store state
."""
1259 fresh_state = model.MapreduceState.get_by_job_id(
1260 mapreduce_spec.mapreduce_id)
1261 if not fresh_state.active:
1263 "Job %s is not active. Looks like spurious task execution. "
1264 "Dropping task.", mapreduce_spec.mapreduce_id)
1266 mapreduce_state.put(config=config)
1268 if done_task and not _run_task_hook(
1269 mapreduce_spec.get_hooks(),
1270 "enqueue_done_task",
1273 done_task.add(queue_name, transactional=True)
1276 logging.info("Final result for job '%s' is '%s'",
1277 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
1278 cls._clean_up_mr(mapreduce_spec)
1281 def _clean_up_mr(cls, mapreduce_spec):
1282 FinalizeJobHandler.schedule(mapreduce_spec)
1285 def get_task_name(mapreduce_spec, serial_id):
1286 """Compute single controller task name
.
1289 mapreduce_spec
: specification of the mapreduce
.
1290 serial_id
: id of the invocation
as int.
1293 task name which should be used to process specified shard
/slice.
1297 return "appengine-mrcontrol-%s-%s" % (
1298 mapreduce_spec.mapreduce_id, serial_id)
1301 def controller_parameters(mapreduce_spec, serial_id):
1302 """Fill
in controller task parameters
.
1304 Returned parameters
map is to be used
as task payload
, and it contains
1305 all the data
, required by controller to perform its function
.
1308 mapreduce_spec
: specification of the mapreduce
.
1309 serial_id
: id of the invocation
as int.
1312 string
->string
map of parameters to be used
as task payload
.
1314 return {"mapreduce_spec": mapreduce_spec.to_json_str(),
1315 "serial_id": str(serial_id)}
1323 """Schedule new update status callback task
.
1326 mapreduce_state
: mapreduce state
as model
.MapreduceState
1327 mapreduce_spec
: mapreduce specification
as MapreduceSpec
.
1328 serial_id
: id of the invocation
as int.
1329 queue_name
: The queue to schedule this task on
. Will use the current
1330 queue of execution
if not supplied
.
1332 task_name = ControllerCallbackHandler.get_task_name(
1333 mapreduce_spec, serial_id)
1334 task_params = ControllerCallbackHandler.controller_parameters(
1335 mapreduce_spec, serial_id)
1337 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
1339 controller_callback_task = model.HugeTask(
1340 url=(mapreduce_spec.params["base_path"] + "/controller_callback/" +
1341 mapreduce_spec.mapreduce_id),
1342 name=task_name, params=task_params,
1343 countdown=parameters.config._CONTROLLER_PERIOD_SEC,
1344 parent=mapreduce_state,
1345 headers=util._get_task_headers(mapreduce_spec.mapreduce_id))
1347 if not _run_task_hook(mapreduce_spec.get_hooks(),
1348 "enqueue_controller_task",
1349 controller_callback_task,
1352 controller_callback_task.add(queue_name)
1353 except (taskqueue.TombstonedTaskError,
1354 taskqueue.TaskAlreadyExistsError), e:
1355 logging.warning("Task %r with params %r already exists. %s: %s",
1356 task_name, task_params, e.__class__, e)
1359 class KickOffJobHandler(base_handler.TaskQueueHandler):
1360 """Taskqueue handler which kicks off a mapreduce processing
.
1362 This handler
is idempotent
.
1365 The Model
.MapreduceState entity
for this mr
is already created
and
1366 saved to datastore by StartJobHandler
._start
_map
.
1369 mapreduce_id
: in string
.
1373 _SERIALIZED_INPUT_READERS_KEY = "input_readers_for_mr_%s"
1376 """Handles kick off request
."""
1378 mr_id = self.request.get("mapreduce_id")
1380 logging.info("Processing kickoff for job %s", mr_id)
1381 state = model.MapreduceState.get_by_job_id(mr_id)
1382 if not self._check_mr_state(state, mr_id):
1386 readers, serialized_readers_entity = self._get_input_readers(state)
1389 logging.warning("Found no mapper input data to process.")
1390 state.active = False
1391 state.result_status = model.MapreduceState.RESULT_SUCCESS
1392 ControllerCallbackHandler._finalize_job(
1393 state.mapreduce_spec, state)
1397 self._setup_output_writer(state)
1401 result = self._save_states(state, serialized_readers_entity)
1403 readers, _ = self._get_input_readers(state)
1407 queue_name = self.request.headers.get("X-AppEngine-QueueName")
1408 KickOffJobHandler._schedule_shards(state.mapreduce_spec, readers,
1410 state.mapreduce_spec.params["base_path"],
1413 ControllerCallbackHandler.reschedule(
1414 state, state.mapreduce_spec, serial_id=0, queue_name=queue_name)
1416 def _drop_gracefully(self):
1418 mr_id = self.request.get("mapreduce_id")
1419 logging.error("Failed to kick off job %s", mr_id)
1421 state = model.MapreduceState.get_by_job_id(mr_id)
1422 if not self._check_mr_state(state, mr_id):
1426 config = util.create_datastore_write_config(state.mapreduce_spec)
1427 model.MapreduceControl.abort(mr_id, config=config)
1430 state.active = False
1431 state.result_status = model.MapreduceState.RESULT_FAILED
1432 ControllerCallbackHandler._finalize_job(state.mapreduce_spec, state)
1434 def _get_input_readers(self, state):
1435 """Get
input readers
.
1438 state
: a MapreduceState model
.
1441 A
tuple: (a
list of
input readers
, a model
._HugeTaskPayload entity
).
1442 The payload entity contains the json serialized
input readers
.
1443 (None, None) when
input reader inplitting returned no data to process
.
1445 serialized_input_readers_key = (self._SERIALIZED_INPUT_READERS_KEY %
1446 state.key().id_or_name())
1447 serialized_input_readers = model._HugeTaskPayload.get_by_key_name(
1448 serialized_input_readers_key, parent=state)
1451 input_reader_class = state.mapreduce_spec.mapper.input_reader_class()
1452 split_param = state.mapreduce_spec.mapper
1453 if issubclass(input_reader_class, map_job.InputReader):
1454 split_param = map_job.JobConfig._to_map_job_config(
1455 state.mapreduce_spec,
1456 os.environ.get("HTTP_X_APPENGINE_QUEUENAME"))
1457 if serialized_input_readers is None:
1458 readers = input_reader_class.split_input(split_param)
1460 readers = [input_reader_class.from_json_str(json) for json in
1461 simplejson.loads(serialized_input_readers.payload)]
1467 state.mapreduce_spec.mapper.shard_count = len(readers)
1468 state.active_shards = len(readers)
1471 if serialized_input_readers is None:
1473 serialized_input_readers = model._HugeTaskPayload(
1474 key_name=serialized_input_readers_key, parent=state)
1475 readers_json_str = [i.to_json_str() for i in readers]
1476 serialized_input_readers.payload = simplejson.dumps(readers_json_str)
1477 return readers, serialized_input_readers
1479 def _setup_output_writer(self, state):
1480 if not state.writer_state:
1481 output_writer_class = state.mapreduce_spec.mapper.output_writer_class()
1482 if output_writer_class:
1483 output_writer_class.init_job(state)
1486 def _save_states(self, state, serialized_readers_entity):
1487 """Run transaction to save state
.
1490 state
: a model
.MapreduceState entity
.
1491 serialized_readers_entity
: a model
._HugeTaskPayload entity containing
1492 json serialized
input readers
.
1495 False if a fatal error
is encountered
and this task should be dropped
1496 immediately
. True if transaction
is successful
. None if a previous
1497 attempt of this same transaction has already succeeded
.
1499 mr_id = state.key().id_or_name()
1500 fresh_state = model.MapreduceState.get_by_job_id(mr_id)
1501 if not self._check_mr_state(fresh_state, mr_id):
1503 if fresh_state.active_shards != 0:
1505 "Mapreduce %s already has active shards. Looks like spurious task "
1506 "execution.", mr_id)
1508 config = util.create_datastore_write_config(state.mapreduce_spec)
1509 db.put([state, serialized_readers_entity], config=config)
1513 def _schedule_shards(cls,
1519 """Prepares shard states
and schedules their execution
.
1521 Even though this method does
not schedule shard task
and save shard state
1522 transactionally
, it
's safe for taskqueue to retry this logic because
1523 the initial shard_state for each shard is the same from any retry.
1524 This is an important yet reasonable assumption on model.ShardState.
1527 spec: mapreduce specification as MapreduceSpec.
1528 readers: list of InputReaders describing shard splits.
1529 queue_name: The queue to run this job on.
1530 base_path: The base url path of mapreduce callbacks.
1531 mr_state: The MapReduceState of current job.
1535 for shard_number, input_reader in enumerate(readers):
1536 shard_state = model.ShardState.create_new(spec.mapreduce_id, shard_number)
1537 shard_state.shard_description = str(input_reader)
1538 shard_states.append(shard_state)
1541 existing_shard_states = db.get(shard.key() for shard in shard_states)
1542 existing_shard_keys = set(shard.key() for shard in existing_shard_states
1543 if shard is not None)
1547 db.put((shard for shard in shard_states
1548 if shard.key() not in existing_shard_keys),
1549 config=util.create_datastore_write_config(spec))
1552 writer_class = spec.mapper.output_writer_class()
1553 writers = [None] * len(readers)
1555 for shard_number, shard_state in enumerate(shard_states):
1556 writers[shard_number] = writer_class.create(
1557 mr_state.mapreduce_spec,
1558 shard_state.shard_number, shard_state.retries + 1,
1559 mr_state.writer_state)
1564 for shard_number, (input_reader, output_writer) in enumerate(
1565 zip(readers, writers)):
1566 shard_id = model.ShardState.shard_id_from_number(
1567 spec.mapreduce_id, shard_number)
1568 task = MapperWorkerCallbackHandler._state_to_task(
1569 model.TransientShardState(
1570 base_path, spec, shard_id, 0, input_reader, input_reader,
1571 output_writer=output_writer,
1572 handler=spec.mapper.handler),
1573 shard_states[shard_number])
1574 MapperWorkerCallbackHandler._add_task(task,
1579 def _check_mr_state(cls, state, mr_id):
1580 """Check MapreduceState.
1583 state: an MapreduceState instance.
1584 mr_id: mapreduce id.
1587 True if state is valid. False if not and this task should be dropped.
1591 "Mapreduce State for job %s is missing. Dropping Task.",
1594 if not state.active:
1596 "Mapreduce %s is not active. Looks like spurious task "
1597 "execution. Dropping Task.", mr_id)
1602 class StartJobHandler(base_handler.PostJsonHandler):
1603 """Command handler starts a mapreduce job.
1605 This handler allows user to start a mr via a web form. It's _start_map
1606 method can also be used independently to start a mapreduce
.
1610 """Handles start request
."""
1612 mapreduce_name = self._get_required_param("name")
1613 mapper_input_reader_spec = self._get_required_param("mapper_input_reader")
1614 mapper_handler_spec = self._get_required_param("mapper_handler")
1615 mapper_output_writer_spec = self.request.get("mapper_output_writer")
1616 mapper_params = self._get_params(
1617 "mapper_params_validator", "mapper_params.")
1618 params = self._get_params(
1619 "params_validator", "params.")
1622 mr_params = map_job.JobConfig._get_default_mr_params()
1623 mr_params.update(params)
1624 if "queue_name" in mapper_params:
1625 mr_params["queue_name"] = mapper_params["queue_name"]
1628 mapper_params["processing_rate"] = int(mapper_params.get(
1629 "processing_rate") or parameters.config.PROCESSING_RATE_PER_SEC)
1632 mapper_spec = model.MapperSpec(
1633 mapper_handler_spec,
1634 mapper_input_reader_spec,
1636 int(mapper_params.get("shard_count", parameters.config.SHARD_COUNT)),
1637 output_writer_spec=mapper_output_writer_spec)
1639 mapreduce_id = self._start_map(
1643 queue_name=mr_params["queue_name"],
1644 _app=mapper_params.get("_app"))
1645 self.json_response["mapreduce_id"] = mapreduce_id
1647 def _get_params(self, validator_parameter, name_prefix):
1648 """Retrieves additional user
-supplied params
for the job
and validates them
.
1651 validator_parameter
: name of the request parameter which supplies
1652 validator
for this parameter
set.
1653 name_prefix
: common prefix
for all parameter names
in the request
.
1656 Any exception raised by the
'params_validator' request parameter
if
1657 the params fail to validate
.
1660 The user parameters
.
1662 params_validator = self.request.get(validator_parameter)
1665 for key in self.request.arguments():
1666 if key.startswith(name_prefix):
1667 values = self.request.get_all(key)
1668 adjusted_key = key[len(name_prefix):]
1669 if len(values) == 1:
1670 user_params[adjusted_key] = values[0]
1672 user_params[adjusted_key] = values
1674 if params_validator:
1675 resolved_validator = util.for_name(params_validator)
1676 resolved_validator(user_params)
1680 def _get_required_param(self, param_name):
1681 """Get a required request parameter
.
1684 param_name
: name of request parameter to fetch
.
1690 errors
.NotEnoughArgumentsError
: if parameter
is not specified
.
1692 value = self.request.get(param_name)
1694 raise errors.NotEnoughArgumentsError(param_name + " not specified")
1705 hooks_class_name=None,
1707 in_xg_transaction=False):
1710 """See control
.start_map
.
1712 Requirements
for this method
:
1713 1. The request that invokes this method can either be regular
or
1714 from taskqueue
. So taskqueue specific headers can
not be used
.
1715 2. Each invocation transactionally starts an isolated mapreduce job with
1716 a unique
id. MapreduceState should be immediately available after
1717 returning
. See control
.start_map
's doc on transactional.
1718 3. Method should be lightweight.
1721 mapper_input_reader_class = mapper_spec.input_reader_class()
1722 mapper_input_reader_class.validate(mapper_spec)
1725 mapper_output_writer_class = mapper_spec.output_writer_class()
1726 if mapper_output_writer_class:
1727 mapper_output_writer_class.validate(mapper_spec)
1730 mapreduce_id = model.MapreduceState.new_mapreduce_id()
1731 mapreduce_spec = model.MapreduceSpec(
1734 mapper_spec.to_json(),
1739 ctx = context.Context(mapreduce_spec, None)
1740 context.Context._set(ctx)
1745 context.Context._set(None)
1748 if in_xg_transaction:
1749 propagation = db.MANDATORY
1751 propagation = db.INDEPENDENT
1753 @db.transactional(propagation=propagation)
1755 cls._create_and_save_state(mapreduce_spec, _app)
1756 cls._add_kickoff_task(mapreduce_params["base_path"], mapreduce_spec, eta,
1757 countdown, queue_name)
1763 def _create_and_save_state(cls, mapreduce_spec, _app):
1764 """Save mapreduce state to datastore.
1766 Save state to datastore so that UI can see it immediately.
1769 mapreduce_spec: model.MapreduceSpec,
1770 _app: app id if specified. None otherwise.
1773 The saved Mapreduce state.
1775 state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
1776 state.mapreduce_spec = mapreduce_spec
1778 state.active_shards = 0
1781 config = util.create_datastore_write_config(mapreduce_spec)
1782 state.put(config=config)
1786 def _add_kickoff_task(cls,
1792 """Enqueues a new kickoff task."""
1793 params = {"mapreduce_id": mapreduce_spec.mapreduce_id}
1795 kickoff_task = taskqueue.Task(
1796 url=base_path + "/kickoffjob_callback/" + mapreduce_spec.mapreduce_id,
1797 headers=util._get_task_headers(mapreduce_spec.mapreduce_id),
1800 countdown=countdown)
1801 hooks = mapreduce_spec.get_hooks()
1802 if hooks is not None:
1804 hooks.enqueue_kickoff_task(kickoff_task, queue_name)
1806 except NotImplementedError:
1808 kickoff_task.add(queue_name, transactional=True)
1811 class FinalizeJobHandler(base_handler.TaskQueueHandler):
1812 """Finalize map job by deleting all temporary entities."""
1815 mapreduce_id = self.request.get("mapreduce_id")
1816 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1819 util.create_datastore_write_config(mapreduce_state.mapreduce_spec))
1820 keys = [model.MapreduceControl.get_key_by_job_id(mapreduce_id)]
1821 for ss in model.ShardState.find_all_by_mapreduce_state(mapreduce_state):
1823 model._HugeTaskPayload.all().ancestor(ss).run(keys_only=True)))
1824 keys.extend(list(model._HugeTaskPayload.all().ancestor(
1825 mapreduce_state).run(keys_only=True)))
1826 db.delete(keys, config=config)
1829 def schedule(cls, mapreduce_spec):
1830 """Schedule finalize task.
1833 mapreduce_spec: mapreduce specification as MapreduceSpec.
1835 task_name = mapreduce_spec.mapreduce_id + "-finalize"
1836 finalize_task = taskqueue.Task(
1838 url=(mapreduce_spec.params["base_path"] + "/finalizejob_callback/" +
1839 mapreduce_spec.mapreduce_id),
1840 params={"mapreduce_id": mapreduce_spec.mapreduce_id},
1841 headers=util._get_task_headers(mapreduce_spec.mapreduce_id))
1842 queue_name = util.get_queue_name(None)
1843 if not _run_task_hook(mapreduce_spec.get_hooks(),
1844 "enqueue_controller_task",
1848 finalize_task.add(queue_name)
1849 except (taskqueue.TombstonedTaskError,
1850 taskqueue.TaskAlreadyExistsError), e:
1851 logging.warning("Task %r already exists. %s: %s",
1852 task_name, e.__class__, e)
1855 class CleanUpJobHandler(base_handler.PostJsonHandler):
1856 """Command to kick off tasks to clean up a job's data
."""
1859 mapreduce_id = self.request.get("mapreduce_id")
1861 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1863 shard_keys = model.ShardState.calculate_keys_by_mapreduce_state(
1865 db.delete(shard_keys)
1866 db.delete(mapreduce_state)
1867 self.json_response["status"] = ("Job %s successfully cleaned up." %
1871 class AbortJobHandler(base_handler.PostJsonHandler):
1872 """Command to abort a running job
."""
1875 model.MapreduceControl.abort(self.request.get("mapreduce_id"))
1876 self.json_response["status"] = "Abort signal sent."