3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
32 """Defines executor tasks handlers for MapReduce implementation."""
49 from google
.appengine
.ext
import ndb
51 from google
.appengine
import runtime
52 from google
.appengine
.api
import datastore_errors
53 from google
.appengine
.api
import logservice
54 from google
.appengine
.api
import modules
55 from google
.appengine
.api
import taskqueue
56 from google
.appengine
.ext
import db
57 from google
.appengine
.ext
.mapreduce
import base_handler
58 from google
.appengine
.ext
.mapreduce
import context
59 from google
.appengine
.ext
.mapreduce
import errors
60 from google
.appengine
.ext
.mapreduce
import input_readers
61 from google
.appengine
.ext
.mapreduce
import model
62 from google
.appengine
.ext
.mapreduce
import operation
63 from google
.appengine
.ext
.mapreduce
import parameters
64 from google
.appengine
.ext
.mapreduce
import util
65 from google
.appengine
.runtime
import apiproxy_errors
69 from google
.appengine
.ext
import cloudstorage
74 if hasattr(cloudstorage
, "_STUB"):
81 _TEST_INJECTED_FAULTS
= set()
84 def _run_task_hook(hooks
, method
, task
, queue_name
):
85 """Invokes hooks.method(task, queue_name).
88 hooks: A hooks.Hooks instance or None.
89 method: The name of the method to invoke on the hooks class e.g.
90 "enqueue_kickoff_task".
91 task: The taskqueue.Task to pass to the hook method.
92 queue_name: The name of the queue to pass to the hook method.
95 True if the hooks.Hooks instance handled the method, False otherwise.
99 getattr(hooks
, method
)(task
, queue_name
)
100 except NotImplementedError:
108 class MapperWorkerCallbackHandler(base_handler
.HugeTaskHandler
):
109 """Callback handler for mapreduce worker task."""
111 _TASK_STATE
= util
._enum
(
113 RETRY_TASK
="retry_task",
115 DROP_TASK
="drop_task",
117 PROCEED_TASK
="proceed_task",
119 RETRY_SHARD
="retry_shard",
121 FAIL_TASK
="fail_task")
123 def __init__(self
, *args
):
125 super(MapperWorkerCallbackHandler
, self
).__init
__(*args
)
126 self
._time
= time
.time
128 def _drop_gracefully(self
):
129 """Drop worker task gracefully.
131 Set current shard_state to failed. Controller logic will take care of
132 other shards and the entire MR.
134 shard_id
= self
.request
.headers
[util
._MR
_SHARD
_ID
_TASK
_HEADER
]
135 mr_id
= self
.request
.headers
[util
._MR
_ID
_TASK
_HEADER
]
136 shard_state
, mr_state
= db
.get([
137 model
.ShardState
.get_key_by_shard_id(shard_id
),
138 model
.MapreduceState
.get_key_by_job_id(mr_id
)])
140 if shard_state
and shard_state
.active
:
141 shard_state
.set_for_failure()
142 config
= util
.create_datastore_write_config(mr_state
.mapreduce_spec
)
143 shard_state
.put(config
=config
)
145 def _try_acquire_lease(self
, shard_state
, tstate
):
146 """Validate datastore and the task payload are consistent.
148 If so, attempt to get a lease on this slice's execution.
149 See model.ShardState doc on slice_start_time.
152 shard_state: model.ShardState from datastore.
153 tstate: model.TransientShardState from taskqueue paylod.
156 A _TASK_STATE enum. PROCEED_TASK if lock is acquired.
157 RETRY_TASK if task should be retried, DROP_TASK if task should
158 be dropped. Only old tasks (comparing to datastore state)
159 will be dropped. Future tasks are retried until they naturally
160 become old so that we don't ever stuck MR.
164 logging
.warning("State not found for shard %s; Possible spurious task "
165 "execution. Dropping this task.",
167 return self
._TASK
_STATE
.DROP_TASK
169 if not shard_state
.active
:
170 logging
.warning("Shard %s is not active. Possible spurious task "
171 "execution. Dropping this task.", tstate
.shard_id
)
172 logging
.warning(str(shard_state
))
173 return self
._TASK
_STATE
.DROP_TASK
176 if shard_state
.retries
> tstate
.retries
:
178 "Got shard %s from previous shard retry %s. Possible spurious "
179 "task execution. Dropping this task.",
182 logging
.warning(str(shard_state
))
183 return self
._TASK
_STATE
.DROP_TASK
184 elif shard_state
.retries
< tstate
.retries
:
189 "ShardState for %s is behind slice. Waiting for it to catch up",
190 shard_state
.shard_id
)
191 return self
._TASK
_STATE
.RETRY_TASK
195 if shard_state
.slice_id
> tstate
.slice_id
:
197 "Task %s-%s is behind ShardState %s. Dropping task.""",
198 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
199 return self._TASK_STATE.DROP_TASK
203 elif shard_state.slice_id < tstate.slice_id:
205 "Task
%s-%s is ahead of ShardState
%s. Waiting
for it to catch up
.",
206 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
207 return self._TASK_STATE.RETRY_TASK
211 if shard_state.slice_start_time:
212 countdown = self._wait_time(shard_state,
213 parameters.config._LEASE_GRACE_PERIOD +
214 parameters.config._SLICE_DURATION_SEC)
217 "Last retry of
slice %s-%s may be still running
."
218 "Will
try again
in %s seconds
", tstate.shard_id, tstate.slice_id,
223 time.sleep(countdown)
224 return self._TASK_STATE.RETRY_TASK
227 if self._wait_time(shard_state,
228 parameters.config._REQUEST_EVENTUAL_TIMEOUT):
229 if not self._old_request_ended(shard_state):
231 "Last retry of
slice %s-%s is still
in flight with request_id
"
232 "%s. Will
try again later
.", tstate.shard_id, tstate.slice_id,
233 shard_state.slice_request_id)
234 return self._TASK_STATE.RETRY_TASK
237 "Last retry of
slice %s-%s has no log entry
and has
"
238 "timed out after
%s seconds
",
239 tstate.shard_id, tstate.slice_id,
240 parameters.config._REQUEST_EVENTUAL_TIMEOUT)
243 config = util.create_datastore_write_config(tstate.mapreduce_spec)
244 @db.transactional(retries=5)
246 """Use datastore to set slice_start_time to now.
248 If failed for any reason, raise error to retry the task (hence all
249 the previous validation code). The task would die naturally eventually.
254 fresh_state = model.ShardState.get_by_shard_id(tstate.shard_id)
256 logging.error("ShardState missing
.")
258 if (fresh_state.active and
259 fresh_state.slice_id == shard_state.slice_id and
260 fresh_state.slice_start_time == shard_state.slice_start_time):
261 shard_state.slice_start_time = datetime.datetime.now()
262 shard_state.slice_request_id = os.environ.get("REQUEST_LOG_ID
")
263 shard_state.acquired_once = True
264 shard_state.put(config=config)
265 return self._TASK_STATE.PROCEED_TASK
268 "Contention on
slice %s-%s execution
. Will retry again
.",
269 tstate.shard_id, tstate.slice_id)
271 time.sleep(random.randrange(1, 5))
272 return self._TASK_STATE.RETRY_TASK
276 def _old_request_ended(self, shard_state):
277 """Whether previous slice retry has ended according to Logs API.
280 shard_state: shard state.
283 True if the request of previous slice retry has ended. False if it has
286 assert shard_state.slice_start_time is not None
287 assert shard_state.slice_request_id is not None
288 request_ids = [shard_state.slice_request_id]
289 logs = list(logservice.fetch(
290 request_ids=request_ids,
292 module_versions=[(os.environ["CURRENT_MODULE_ID
"],
293 modules.get_current_version_name())]))
295 if not logs or not logs[0].finished:
299 def _wait_time(self, shard_state, secs, now=datetime.datetime.now):
300 """Time to wait until slice_start_time is secs ago from now.
303 shard_state: shard state.
304 secs: duration in seconds.
305 now: a func that gets now.
308 0 if no wait. A positive int in seconds otherwise. Always around up.
310 assert shard_state.slice_start_time is not None
311 delta = now() - shard_state.slice_start_time
312 duration = datetime.timedelta(seconds=secs)
314 return util.total_seconds(duration - delta)
318 def _try_free_lease(self, shard_state, slice_retry=False):
319 """Try to free lease.
321 A lightweight transaction to update shard_state and unset
322 slice_start_time to allow the next retry to happen without blocking.
323 We don't care if this fails or not because the lease will expire
326 Under normal execution, _save_state_and_schedule_next is the exit point.
327 It updates/saves shard state and schedules the next slice or returns.
328 Other exit points are:
329 1. _are_states_consistent: at the beginning of handle, checks
330 if datastore states and the task are in sync.
331 If not, raise or return.
332 2. _attempt_slice_retry: may raise exception to taskqueue.
333 3. _save_state_and_schedule_next: may raise exception when taskqueue/db
336 This handler should try to free the lease on every exceptional exit point.
339 shard_state: model.ShardState.
340 slice_retry: whether to count this as a failed slice execution.
344 fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
345 if fresh_state and fresh_state.active:
347 fresh_state.slice_start_time = None
348 fresh_state.slice_request_id = None
350 fresh_state.slice_retries += 1
358 "Release lock
for shard
%s failed
. Wait
for lease to expire
.",
359 shard_state.shard_id)
364 This method has to be careful to pass the same ShardState instance to
365 its subroutines calls if the calls mutate or read from ShardState.
366 Note especially that Context instance caches and updates the ShardState
370 Set HTTP status code and always returns None.
372 self._start_time = self._time()
374 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
375 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
376 spec = model.MapreduceSpec._get_mapreduce_spec(mr_id)
377 shard_state, control = db.get([
378 model.ShardState.get_key_by_shard_id(shard_id),
379 model.MapreduceControl.get_key_by_job_id(mr_id),
383 ctx = context.Context(spec, shard_state,
384 task_retry_count=self.task_retry_count())
385 context.Context._set(ctx)
388 tstate = model.TransientShardState.from_request(self.request)
389 task_state = self._try_acquire_lease(shard_state, tstate)
390 if task_state == self._TASK_STATE.RETRY_TASK:
391 return self.retry_task()
392 if task_state == self._TASK_STATE.DROP_TASK:
394 assert task_state == self._TASK_STATE.PROCEED_TASK
396 if control and control.command == model.MapreduceControl.ABORT:
397 logging.info("Abort command received by shard
%d of job
'%s'",
398 shard_state.shard_number, shard_state.mapreduce_id)
401 shard_state.set_for_abort()
402 shard_state.put(config=util.create_datastore_write_config(spec))
407 util._set_ndb_cache_policy()
409 cloudstorage.set_default_retry_params(
410 cloudstorage.RetryParams(
411 urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC))
414 finished_shard = self.process_inputs(
415 tstate.input_reader, shard_state, tstate, ctx)
420 if tstate.output_writer:
425 tstate.output_writer.finalize(ctx, shard_state)
426 shard_state.set_for_success()
429 task_state = self._retry_logic(
430 e, shard_state, tstate, spec.mapreduce_id)
432 if task_state == self._TASK_STATE.RETRY_TASK:
434 return self.retry_task()
435 self._save_state_and_schedule_next(shard_state, tstate, task_state)
437 def process_inputs(self,
442 """Read inputs, process them, and write out outputs.
444 This is the core logic of MapReduce. It reads inputs from input reader,
445 invokes user specified mapper function, and writes output with
446 output writer. It also updates shard_state accordingly.
447 e.g. if shard processing is done, set shard_state.active to False.
449 If errors.FailJobError is caught, it will fail this MR job.
450 All other exceptions will be logged and raised to taskqueue for retry
451 until the number of retries exceeds a limit.
454 input_reader: input reader.
455 shard_state: shard state.
456 tstate: transient shard state.
457 ctx: mapreduce context.
460 Whether this shard has finished processing all its input split.
462 processing_limit = self._processing_limit(tstate.mapreduce_spec)
463 if processing_limit == 0:
466 finished_shard = True
468 for entity in input_reader:
469 if isinstance(entity, db.Model):
470 shard_state.last_work_item = repr(entity.key())
471 elif isinstance(entity, ndb.Model):
472 shard_state.last_work_item = repr(entity.key)
474 shard_state.last_work_item = repr(entity)[:100]
476 processing_limit -= 1
478 if not self.process_data(
479 entity, input_reader, ctx, tstate):
480 finished_shard = False
482 elif processing_limit == 0:
483 finished_shard = False
487 operation.counters.Increment(
488 context.COUNTER_MAPPER_WALLTIME_MS,
489 int((self._time() - self._start_time)*1000))(ctx)
492 return finished_shard
494 def process_data(self, data, input_reader, ctx, transient_shard_state):
495 """Process a single data piece.
497 Call mapper handler on the data.
500 data: a datum to process.
501 input_reader: input reader.
502 ctx: mapreduce context
503 transient_shard_state: transient shard state.
506 True if scan should be continued, False if scan should be stopped.
508 if data is not input_readers.ALLOW_CHECKPOINT:
509 ctx.counters.increment(context.COUNTER_MAPPER_CALLS)
511 handler = transient_shard_state.handler
513 if input_reader.expand_parameters:
514 result = handler(*data)
516 result = handler(data)
518 if util.is_generator(result):
519 for output in result:
520 if isinstance(output, operation.Operation):
523 output_writer = transient_shard_state.output_writer
524 if not output_writer:
526 "Handler yielded
%s, but no output writer
is set.", output)
528 output_writer.write(output)
530 if self._time() - self._start_time >= parameters.config._SLICE_DURATION_SEC:
534 def _save_state_and_schedule_next(self, shard_state, tstate, task_state):
535 """Save state to datastore and schedule next task for this shard.
537 Update and save shard state. Schedule next slice if needed.
538 This method handles interactions with datastore and taskqueue.
541 shard_state: model.ShardState for current shard.
542 tstate: model.TransientShardState for current shard.
543 task_state: enum _TASK_STATE.
546 spec = tstate.mapreduce_spec
547 config = util.create_datastore_write_config(spec)
550 if task_state == self._TASK_STATE.RETRY_SHARD:
553 task = self._state_to_task(tstate, shard_state)
554 elif task_state == self._TASK_STATE.PROCEED_TASK:
555 shard_state.advance_for_next_slice()
556 tstate.advance_for_next_slice()
557 countdown = self._get_countdown_for_next_slice(spec)
558 task = self._state_to_task(tstate, shard_state, countdown=countdown)
560 assert task_state == self._TASK_STATE.FAIL_TASK
563 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME
", "default
")
565 @db.transactional(retries=5)
567 fresh_shard_state = model.ShardState.get_by_shard_id(tstate.shard_id)
568 if not fresh_shard_state:
570 if (not fresh_shard_state.active or
571 "worker_active_state_collision
" in _TEST_INJECTED_FAULTS):
572 logging.error("Shard
%s is not active
. Possible spurious task
"
573 "execution
. Dropping this task
.", tstate.shard_id)
574 logging.error("Datastore
's %s", str(fresh_shard_state))
575 logging.error("Slice's
%s", str(shard_state))
577 fresh_shard_state.copy_from(shard_state)
578 fresh_shard_state.put(config=config)
583 if fresh_shard_state.active:
586 self._add_task(task, spec, queue_name)
590 except (datastore_errors.Error,
592 runtime.DeadlineExceededError,
593 apiproxy_errors.Error), e:
595 "Can
't transactionally continue shard. "
596 "Will retry slice %s %s for the %s time.",
599 self.task_retry_count() + 1)
600 self._try_free_lease(shard_state)
605 def _retry_logic(self, e, shard_state, tstate, mr_id):
606 """Handle retry for this slice.
608 This method may modify shard_state and tstate to prepare for retry or fail.
611 e: the exception caught.
612 shard_state: model.ShardState for current shard.
613 tstate: model.TransientShardState for current shard.
617 A _TASK_STATE enum. RETRY_SHARD if shard should be retried.
618 RETRY_TASK if slice should be retried. FAIL_TASK otherwise.
620 logging.error("Shard %s got error.", shard_state.shard_id)
623 logging.error(traceback.format_exc())
626 if type(e) is errors.FailJobError:
627 logging.error("Got FailJobError. Shard %s failed permanently.",
628 shard_state.shard_id)
629 shard_state.set_for_failure()
630 return self._TASK_STATE.FAIL_TASK
632 task_state = self._attempt_slice_retry(shard_state, tstate)
633 if task_state == self._TASK_STATE.RETRY_SHARD:
634 task_state = self._attempt_shard_retry(shard_state, tstate, mr_id)
635 if task_state == self._TASK_STATE.FAIL_TASK:
636 shard_state.set_for_failure()
637 logging.error("Shard %s failed permanently.", shard_state.shard_id)
640 def _attempt_shard_retry(self, shard_state, tstate, mr_id):
641 """Whether to retry shard.
643 This method may modify shard_state and tstate to prepare for retry or fail.
646 shard_state: model.ShardState for current shard.
647 tstate: model.TransientShardState for current shard.
651 A _TASK_STATE enum. RETRY_SHARD if shard should be retried.
654 shard_attempts = shard_state.retries + 1
656 if shard_attempts >= parameters.config.SHARD_MAX_ATTEMPTS:
658 "Shard attempt %s exceeded %s max attempts.",
659 shard_attempts, parameters.config.SHARD_MAX_ATTEMPTS)
660 return self._TASK_STATE.FAIL_TASK
661 if tstate.output_writer and (
662 not tstate.output_writer._can_be_retried(tstate)):
663 logging.error("Output writer %s does not support shard retry.",
664 tstate.output_writer.__class__.__name__)
665 return self._TASK_STATE.FAIL_TASK
667 shard_state.reset_for_retry()
668 logging.error("Shard %s attempt %s failed with up to %s attempts.",
669 shard_state.shard_id,
671 parameters.config.SHARD_MAX_ATTEMPTS)
673 if tstate.output_writer:
674 mr_state = model.MapreduceState.get_by_job_id(mr_id)
675 output_writer = tstate.output_writer.create(
676 mr_state, shard_state)
677 tstate.reset_for_retry(output_writer)
678 return self._TASK_STATE.RETRY_SHARD
680 def _attempt_slice_retry(self, shard_state, tstate):
681 """Attempt to retry this slice.
683 This method may modify shard_state and tstate to prepare for retry or fail.
686 shard_state: model.ShardState for current shard.
687 tstate: model.TransientShardState for current shard.
690 A _TASK_STATE enum. RETRY_TASK if slice should be retried.
691 RETRY_SHARD if shard retry should be attempted.
693 if (shard_state.slice_retries + 1 <
694 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS):
696 "Slice %s %s failed for the %s of up to %s attempts "
697 "(%s of %s taskqueue execution attempts). "
701 shard_state.slice_retries + 1,
702 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS,
703 self.task_retry_count() + 1,
704 parameters.config.TASK_MAX_ATTEMPTS)
709 self._try_free_lease(shard_state, slice_retry=True)
710 return self._TASK_STATE.RETRY_TASK
712 if parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS > 0:
713 logging.error("Slice attempt %s exceeded %s max attempts.",
714 self.task_retry_count() + 1,
715 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)
716 return self._TASK_STATE.RETRY_SHARD
719 def get_task_name(shard_id, slice_id, retry=0):
720 """Compute single worker task name.
725 retry: current shard retry count.
728 task name which should be used to process specified shard/slice.
732 return "appengine-mrshard-%s-%s-retry-%s" % (
733 shard_id, slice_id, retry)
735 def _get_countdown_for_next_slice(self, spec):
736 """Get countdown for next slice's task
.
738 When user sets processing rate
, we
set countdown to delay task execution
.
741 spec
: model
.MapreduceSpec
747 if self._processing_limit(spec) != -1:
749 int(parameters.config._SLICE_DURATION_SEC -
750 (self._time() - self._start_time)), 0)
754 def _state_to_task(cls,
759 """Generate task
for slice according to current states
.
762 tstate
: An instance of TransientShardState
.
763 shard_state
: An instance of ShardState
.
764 eta
: Absolute time when the MR should execute
. May
not be specified
765 if 'countdown' is also supplied
. This may be timezone
-aware
or
767 countdown
: Time
in seconds into the future that this MR should execute
.
771 A model
.HugeTask instance
for the
slice specified by current states
.
773 base_path = tstate.base_path
775 task_name = MapperWorkerCallbackHandler.get_task_name(
780 headers = util._get_task_headers(tstate.mapreduce_spec)
781 headers[util._MR_SHARD_ID_TASK_HEADER] = tstate.shard_id
783 worker_task = model.HugeTask(
784 url=base_path + "/worker_callback",
785 params=tstate.to_dict(),
798 """Schedule
slice scanning by adding it to the task queue
.
801 worker_task
: a model
.HugeTask task
for slice. This
is NOT a taskqueue
803 mapreduce_spec
: an instance of model
.MapreduceSpec
.
804 queue_name
: Optional queue to run on
; uses the current queue of
805 execution
or the default queue
if unspecified
.
807 if not _run_task_hook(mapreduce_spec.get_hooks(),
808 "enqueue_worker_task",
814 worker_task.add(queue_name)
815 except (taskqueue.TombstonedTaskError,
816 taskqueue.TaskAlreadyExistsError), e:
817 logging.warning("Task %r already exists. %s: %s",
822 def _processing_limit(self, spec):
823 """Get the limit on the number of
map calls allowed by this
slice.
826 spec
: a Mapreduce spec
.
829 The limit
as a positive
int if specified by user
. -1 otherwise
.
831 processing_rate = float(spec.mapper.params.get("processing_rate", 0))
832 slice_processing_limit = -1
833 if processing_rate > 0:
834 slice_processing_limit = int(math.ceil(
835 parameters.config._SLICE_DURATION_SEC*processing_rate/
836 int(spec.mapper.shard_count)))
837 return slice_processing_limit
842 def _schedule_slice(cls,
848 """Schedule
slice scanning by adding it to the task queue
.
851 shard_state
: An instance of ShardState
.
852 tstate
: An instance of TransientShardState
.
853 queue_name
: Optional queue to run on
; uses the current queue of
854 execution
or the default queue
if unspecified
.
855 eta
: Absolute time when the MR should execute
. May
not be specified
856 if 'countdown' is also supplied
. This may be timezone
-aware
or
858 countdown
: Time
in seconds into the future that this MR should execute
.
861 queue_name = queue_name or os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
863 task = cls._state_to_task(tstate, shard_state, eta, countdown)
864 cls._add_task(task, tstate.mapreduce_spec, queue_name)
867 class ControllerCallbackHandler(base_handler.HugeTaskHandler):
868 """Supervises mapreduce execution
.
870 Is also responsible
for gathering execution status
from shards together
.
872 This task
is "continuously" running by adding itself again to taskqueue
if
873 and only
if mapreduce
is still active
. A mapreduce
is active
if it has
874 actively running shards
.
877 def __init__(self, *args):
879 super(ControllerCallbackHandler, self).__init__(*args)
880 self._time = time.time
882 def _drop_gracefully(self):
883 """Gracefully drop controller task
.
885 This method
is called when decoding controller task payload failed
.
886 Upon this we mark ShardState
and MapreduceState
as failed so all
889 Writing to datastore
is forced (ignore read
-only mode
) because we
890 want the tasks to stop badly
, and if force_writes was
False,
891 the job would have never been started
.
893 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
894 state = model.MapreduceState.get_by_job_id(mr_id)
895 if not state or not state.active:
899 state.result_status = model.MapreduceState.RESULT_FAILED
900 config = util.create_datastore_write_config(state.mapreduce_spec)
902 for ss in model.ShardState.find_all_by_mapreduce_state(state):
907 if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
908 db.put(puts, config=config)
910 db.put(puts, config=config)
912 db.put(state, config=config)
915 """Handle request
."""
916 spec = model.MapreduceSpec.from_json_str(
917 self.request.get("mapreduce_spec"))
918 state, control = db.get([
919 model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
920 model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
924 logging.warning("State not found for MR '%s'; dropping controller task.",
929 "MR %r is not active. Looks like spurious controller task execution.",
931 self._clean_up_mr(spec, self.base_path())
934 shard_states = model.ShardState.find_all_by_mapreduce_state(state)
935 self._update_state_from_shard_states(state, shard_states, control)
938 ControllerCallbackHandler.reschedule(
939 state, self.base_path(), spec, self.serial_id() + 1)
941 def _update_state_from_shard_states(self, state, shard_states, control):
942 """Update mr state by examing shard states
.
945 state
: current mapreduce state
as MapreduceState
.
946 shard_states
: an iterator over shard states
.
947 control
: model
.MapreduceControl entity
.
950 state.active_shards, state.aborted_shards, state.failed_shards = 0, 0, 0
952 processed_counts = []
953 state.counters_map.clear()
956 for s in shard_states:
959 state.active_shards += 1
960 if s.result_status == model.ShardState.RESULT_ABORTED:
961 state.aborted_shards += 1
962 elif s.result_status == model.ShardState.RESULT_FAILED:
963 state.failed_shards += 1
966 state.counters_map.add_map(s.counters_map)
967 processed_counts.append(s.counters_map.get(context.COUNTER_MAPPER_CALLS))
969 state.set_processed_counts(processed_counts)
970 state.last_poll_time = datetime.datetime.utcfromtimestamp(self._time())
972 spec = state.mapreduce_spec
974 if total_shards != spec.mapper.shard_count:
975 logging.error("Found %d shard states. Expect %d. "
976 "Issuing abort command to job '%s'",
977 total_shards, spec.mapper.shard_count,
980 model.MapreduceControl.abort(spec.mapreduce_id)
984 state.active = bool(state.active_shards)
985 if not control and (state.failed_shards or state.aborted_shards):
987 model.MapreduceControl.abort(spec.mapreduce_id)
991 if state.failed_shards or not total_shards:
992 state.result_status = model.MapreduceState.RESULT_FAILED
995 elif state.aborted_shards:
996 state.result_status = model.MapreduceState.RESULT_ABORTED
998 state.result_status = model.MapreduceState.RESULT_SUCCESS
999 self._finalize_outputs(spec, state)
1000 self._finalize_job(spec, state, self.base_path())
1002 @db.transactional(retries=5)
1004 fresh_state = model.MapreduceState.get_by_job_id(spec.mapreduce_id)
1007 if not fresh_state.active:
1009 "Job %s is not active. Looks like spurious task execution. "
1010 "Dropping controller task.", spec.mapreduce_id)
1012 config = util.create_datastore_write_config(spec)
1013 state.put(config=config)
1017 def serial_id(self):
1018 """Get serial unique identifier of this task
from request
.
1021 serial identifier
as int.
1023 return int(self.request.get("serial_id"))
1026 def _finalize_outputs(cls, mapreduce_spec, mapreduce_state):
1027 """Finalize outputs
.
1030 mapreduce_spec
: an instance of MapreduceSpec
.
1031 mapreduce_state
: an instance of MapreduceState
.
1034 if (mapreduce_spec.mapper.output_writer_class() and
1035 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
1036 mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
1039 def _finalize_job(cls, mapreduce_spec, mapreduce_state, base_path):
1040 """Finalize job execution
.
1042 Invokes done callback
and save mapreduce state
in a transaction
,
1043 and schedule necessary clean ups
. This method
is idempotent
.
1046 mapreduce_spec
: an instance of MapreduceSpec
1047 mapreduce_state
: an instance of MapreduceState
1048 base_path
: handler_base path
.
1050 config = util.create_datastore_write_config(mapreduce_spec)
1051 queue_name = util.get_queue_name(mapreduce_spec.params.get(
1052 model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
1053 done_callback = mapreduce_spec.params.get(
1054 model.MapreduceSpec.PARAM_DONE_CALLBACK)
1057 done_task = taskqueue.Task(
1059 headers=util._get_task_headers(mapreduce_spec,
1060 util.CALLBACK_MR_ID_TASK_HEADER),
1061 method=mapreduce_spec.params.get("done_callback_method", "POST"))
1063 @db.transactional(retries=5)
1065 fresh_state = model.MapreduceState.get_by_job_id(
1066 mapreduce_spec.mapreduce_id)
1067 if not fresh_state.active:
1069 "Job %s is not active. Looks like spurious task execution. "
1070 "Dropping task.", mapreduce_spec.mapreduce_id)
1072 mapreduce_state.put(config=config)
1074 if done_task and not _run_task_hook(
1075 mapreduce_spec.get_hooks(),
1076 "enqueue_done_task",
1079 done_task.add(queue_name, transactional=True)
1082 logging.info("Final result for job '%s' is '%s'",
1083 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
1084 cls._clean_up_mr(mapreduce_spec, base_path)
1087 def _clean_up_mr(cls, mapreduce_spec, base_path):
1088 FinalizeJobHandler.schedule(base_path, mapreduce_spec)
1091 def get_task_name(mapreduce_spec, serial_id):
1092 """Compute single controller task name
.
1095 transient_shard_state
: an instance of TransientShardState
.
1098 task name which should be used to process specified shard
/slice.
1102 return "appengine-mrcontrol-%s-%s" % (
1103 mapreduce_spec.mapreduce_id, serial_id)
1106 def controller_parameters(mapreduce_spec, serial_id):
1107 """Fill
in controller task parameters
.
1109 Returned parameters
map is to be used
as task payload
, and it contains
1110 all the data
, required by controller to perform its function
.
1113 mapreduce_spec
: specification of the mapreduce
.
1114 serial_id
: id of the invocation
as int.
1117 string
->string
map of parameters to be used
as task payload
.
1119 return {"mapreduce_spec": mapreduce_spec.to_json_str(),
1120 "serial_id": str(serial_id)}
1129 """Schedule new update status callback task
.
1132 mapreduce_state
: mapreduce state
as model
.MapreduceState
1133 base_path
: mapreduce handlers url base path
as string
.
1134 mapreduce_spec
: mapreduce specification
as MapreduceSpec
.
1135 serial_id
: id of the invocation
as int.
1136 queue_name
: The queue to schedule this task on
. Will use the current
1137 queue of execution
if not supplied
.
1139 task_name = ControllerCallbackHandler.get_task_name(
1140 mapreduce_spec, serial_id)
1141 task_params = ControllerCallbackHandler.controller_parameters(
1142 mapreduce_spec, serial_id)
1144 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
1146 controller_callback_task = model.HugeTask(
1147 url=base_path + "/controller_callback",
1148 name=task_name, params=task_params,
1149 countdown=parameters.config._CONTROLLER_PERIOD_SEC,
1150 parent=mapreduce_state,
1151 headers=util._get_task_headers(mapreduce_spec))
1153 if not _run_task_hook(mapreduce_spec.get_hooks(),
1154 "enqueue_controller_task",
1155 controller_callback_task,
1158 controller_callback_task.add(queue_name)
1159 except (taskqueue.TombstonedTaskError,
1160 taskqueue.TaskAlreadyExistsError), e:
1161 logging.warning("Task %r with params %r already exists. %s: %s",
1162 task_name, task_params, e.__class__, e)
1165 class KickOffJobHandler(base_handler.TaskQueueHandler):
1166 """Taskqueue handler which kicks off a mapreduce processing
.
1168 This handler
is idempotent
.
1171 The Model
.MapreduceState entity
for this mr
is already created
and
1172 saved to datastore by StartJobHandler
._start
_map
.
1175 mapreduce_id
: in string
.
1179 _SERIALIZED_INPUT_READERS_KEY = "input_readers_for_mr_%s"
1182 """Handles kick off request
."""
1184 mr_id = self.request.get("mapreduce_id")
1186 logging.info("Processing kickoff for job %s", mr_id)
1187 state = model.MapreduceState.get_by_job_id(mr_id)
1188 if not self._check_mr_state(state, mr_id):
1192 readers, serialized_readers_entity = self._get_input_readers(state)
1195 logging.warning("Found no mapper input data to process.")
1196 state.active = False
1197 state.result_status = model.MapreduceState.RESULT_SUCCESS
1198 ControllerCallbackHandler._finalize_job(
1199 state.mapreduce_spec, state, self.base_path())
1203 self._setup_output_writer(state)
1207 result = self._save_states(state, serialized_readers_entity)
1209 readers, _ = self._get_input_readers(state)
1213 queue_name = self.request.headers.get("X-AppEngine-QueueName")
1214 KickOffJobHandler._schedule_shards(state.mapreduce_spec, readers,
1215 queue_name, self.base_path(), state)
1217 ControllerCallbackHandler.reschedule(
1218 state, self.base_path(), state.mapreduce_spec, serial_id=0,
1219 queue_name=queue_name)
1221 def _get_input_readers(self, state):
1222 """Get
input readers
.
1225 state
: a MapreduceState model
.
1228 A
tuple: (a
list of
input readers
, a model
._HugeTaskPayload entity
).
1229 The payload entity contains the json serialized
input readers
.
1230 (None, None) when
input reader inplitting returned no data to process
.
1232 serialized_input_readers_key = (self._SERIALIZED_INPUT_READERS_KEY %
1233 state.key().id_or_name())
1234 serialized_input_readers = model._HugeTaskPayload.get_by_key_name(
1235 serialized_input_readers_key, parent=state)
1238 input_reader_class = state.mapreduce_spec.mapper.input_reader_class()
1239 if serialized_input_readers is None:
1240 readers = input_reader_class.split_input(
1241 state.mapreduce_spec.mapper)
1243 readers = [input_reader_class.from_json_str(json) for json in
1244 simplejson.loads(serialized_input_readers.payload)]
1250 state.mapreduce_spec.mapper.shard_count = len(readers)
1251 state.active_shards = len(readers)
1254 if serialized_input_readers is None:
1256 serialized_input_readers = model._HugeTaskPayload(
1257 key_name=serialized_input_readers_key, parent=state)
1258 readers_json_str = [i.to_json_str() for i in readers]
1259 serialized_input_readers.payload = simplejson.dumps(readers_json_str)
1260 return readers, serialized_input_readers
1262 def _setup_output_writer(self, state):
1263 if not state.writer_state:
1264 output_writer_class = state.mapreduce_spec.mapper.output_writer_class()
1265 if output_writer_class:
1266 output_writer_class.init_job(state)
1269 def _save_states(self, state, serialized_readers_entity):
1270 """Run transaction to save state
.
1273 state
: a model
.MapreduceState entity
.
1274 serialized_readers_entity
: a model
._HugeTaskPayload entity containing
1275 json serialized
input readers
.
1278 False if a fatal error
is encountered
and this task should be dropped
1279 immediately
. True if transaction
is successful
. None if a previous
1280 attempt of this same transaction has already succeeded
.
1282 mr_id = state.key().id_or_name()
1283 fresh_state = model.MapreduceState.get_by_job_id(mr_id)
1284 if not self._check_mr_state(fresh_state, mr_id):
1286 if fresh_state.active_shards != 0:
1288 "Mapreduce %s already has active shards. Looks like spurious task "
1289 "execution.", mr_id)
1291 config = util.create_datastore_write_config(state.mapreduce_spec)
1292 db.put([state, serialized_readers_entity], config=config)
1296 def _schedule_shards(cls,
1302 """Prepares shard states
and schedules their execution
.
1304 Even though this method does
not schedule shard task
and save shard state
1305 transactionally
, it
's safe for taskqueue to retry this logic because
1306 the initial shard_state for each shard is the same from any retry.
1307 This is an important yet reasonable assumption on model.ShardState.
1310 spec: mapreduce specification as MapreduceSpec.
1311 readers: list of InputReaders describing shard splits.
1312 queue_name: The queue to run this job on.
1313 base_path: The base url path of mapreduce callbacks.
1314 mr_state: The MapReduceState of current job.
1318 for shard_number, input_reader in enumerate(readers):
1319 shard_state = model.ShardState.create_new(spec.mapreduce_id, shard_number)
1320 shard_state.shard_description = str(input_reader)
1321 shard_states.append(shard_state)
1324 existing_shard_states = db.get(shard.key() for shard in shard_states)
1325 existing_shard_keys = set(shard.key() for shard in existing_shard_states
1326 if shard is not None)
1329 db.put((shard for shard in shard_states
1330 if shard.key() not in existing_shard_keys),
1331 config=util.create_datastore_write_config(spec))
1334 writer_class = spec.mapper.output_writer_class()
1335 writers = [None] * len(readers)
1337 for shard_number, shard_state in enumerate(shard_states):
1338 writers[shard_number] = writer_class.create(mr_state, shard_state)
1343 for shard_number, (input_reader, output_writer) in enumerate(
1344 zip(readers, writers)):
1345 shard_id = model.ShardState.shard_id_from_number(
1346 spec.mapreduce_id, shard_number)
1347 task = MapperWorkerCallbackHandler._state_to_task(
1348 model.TransientShardState(
1349 base_path, spec, shard_id, 0, input_reader, input_reader,
1350 output_writer=output_writer,
1351 handler=spec.mapper.handler),
1352 shard_states[shard_number])
1353 MapperWorkerCallbackHandler._add_task(task,
1358 def _check_mr_state(cls, state, mr_id):
1359 """Check MapreduceState.
1362 state: an MapreduceState instance.
1363 mr_id: mapreduce id.
1366 True if state is valid. False if not and this task should be dropped.
1370 "Mapreduce State for job %s is missing. Dropping Task.",
1373 if not state.active:
1375 "Mapreduce %s is not active. Looks like spurious task "
1376 "execution. Dropping Task.", mr_id)
1381 class StartJobHandler(base_handler.PostJsonHandler):
1382 """Command handler starts a mapreduce job.
1384 This handler allows user to start a mr via a web form. It's _start_map
1385 method can also be used independently to start a mapreduce
.
1389 """Handles start request
."""
1391 mapreduce_name = self._get_required_param("name")
1392 mapper_input_reader_spec = self._get_required_param("mapper_input_reader")
1393 mapper_handler_spec = self._get_required_param("mapper_handler")
1394 mapper_output_writer_spec = self.request.get("mapper_output_writer")
1395 mapper_params = self._get_params(
1396 "mapper_params_validator", "mapper_params.")
1397 params = self._get_params(
1398 "params_validator", "params.")
1401 mapper_params["processing_rate"] = int(mapper_params.get(
1402 "processing_rate") or parameters.config.PROCESSING_RATE_PER_SEC)
1403 queue_name = mapper_params["queue_name"] = util.get_queue_name(
1404 mapper_params.get("queue_name", None))
1407 mapper_spec = model.MapperSpec(
1408 mapper_handler_spec,
1409 mapper_input_reader_spec,
1411 int(mapper_params.get("shard_count", parameters.config.SHARD_COUNT)),
1412 output_writer_spec=mapper_output_writer_spec)
1414 mapreduce_id = type(self)._start_map(
1418 base_path=self.base_path(),
1419 queue_name=queue_name,
1420 _app=mapper_params.get("_app"))
1421 self.json_response["mapreduce_id"] = mapreduce_id
1423 def _get_params(self, validator_parameter, name_prefix):
1424 """Retrieves additional user
-supplied params
for the job
and validates them
.
1427 validator_parameter
: name of the request parameter which supplies
1428 validator
for this parameter
set.
1429 name_prefix
: common prefix
for all parameter names
in the request
.
1432 Any exception raised by the
'params_validator' request parameter
if
1433 the params fail to validate
.
1435 params_validator = self.request.get(validator_parameter)
1438 for key in self.request.arguments():
1439 if key.startswith(name_prefix):
1440 values = self.request.get_all(key)
1441 adjusted_key = key[len(name_prefix):]
1442 if len(values) == 1:
1443 user_params[adjusted_key] = values[0]
1445 user_params[adjusted_key] = values
1447 if params_validator:
1448 resolved_validator = util.for_name(params_validator)
1449 resolved_validator(user_params)
1453 def _get_required_param(self, param_name):
1454 """Get a required request parameter
.
1457 param_name
: name of request parameter to fetch
.
1463 errors
.NotEnoughArgumentsError
: if parameter
is not specified
.
1465 value = self.request.get(param_name)
1467 raise errors.NotEnoughArgumentsError(param_name + " not specified")
1479 hooks_class_name=None,
1481 in_xg_transaction=False):
1484 """See control
.start_map
.
1486 Requirements
for this method
:
1487 1. The request that invokes this method can either be regular
or
1488 from taskqueue
. So taskqueue specific headers can
not be used
.
1489 2. Each invocation transactionally starts an isolated mapreduce job with
1490 a unique
id. MapreduceState should be immediately available after
1491 returning
. See control
.start_map
's doc on transactional.
1492 3. Method should be lightweight.
1495 mapper_input_reader_class = mapper_spec.input_reader_class()
1496 mapper_input_reader_class.validate(mapper_spec)
1499 mapper_output_writer_class = mapper_spec.output_writer_class()
1500 if mapper_output_writer_class:
1501 mapper_output_writer_class.validate(mapper_spec)
1504 mapreduce_id = model.MapreduceState.new_mapreduce_id()
1505 mapreduce_spec = model.MapreduceSpec(
1508 mapper_spec.to_json(),
1513 ctx = context.Context(mapreduce_spec, None)
1514 context.Context._set(ctx)
1519 context.Context._set(None)
1522 if in_xg_transaction:
1523 propagation = db.MANDATORY
1525 propagation = db.INDEPENDENT
1527 @db.transactional(propagation=propagation)
1529 cls._create_and_save_state(mapreduce_spec, _app)
1530 cls._add_kickoff_task(base_path, mapreduce_spec, eta,
1531 countdown, queue_name)
1537 def _create_and_save_state(cls, mapreduce_spec, _app):
1538 """Save mapreduce state to datastore.
1540 Save state to datastore so that UI can see it immediately.
1543 mapreduce_spec: model.MapreduceSpec,
1544 _app: app id if specified. None otherwise.
1547 The saved Mapreduce state.
1549 state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
1550 state.mapreduce_spec = mapreduce_spec
1552 state.active_shards = 0
1555 config = util.create_datastore_write_config(mapreduce_spec)
1556 state.put(config=config)
1560 def _add_kickoff_task(cls,
1566 params = {"mapreduce_id": mapreduce_spec.mapreduce_id}
1568 kickoff_task = taskqueue.Task(
1569 url=base_path + "/kickoffjob_callback",
1570 headers=util._get_task_headers(mapreduce_spec),
1573 countdown=countdown)
1574 hooks = mapreduce_spec.get_hooks()
1575 if hooks is not None:
1577 hooks.enqueue_kickoff_task(kickoff_task, queue_name)
1579 except NotImplementedError:
1581 kickoff_task.add(queue_name, transactional=True)
1584 class FinalizeJobHandler(base_handler.TaskQueueHandler):
1585 """Finalize map job by deleting all temporary entities."""
1588 mapreduce_id = self.request.get("mapreduce_id")
1589 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1591 config=util.create_datastore_write_config(mapreduce_state.mapreduce_spec)
1592 keys = [model.MapreduceControl.get_key_by_job_id(mapreduce_id)]
1593 for ss in model.ShardState.find_all_by_mapreduce_state(mapreduce_state):
1595 model._HugeTaskPayload.all().ancestor(ss).run(keys_only=True)))
1596 keys.extend(list(model._HugeTaskPayload.all().ancestor(
1597 mapreduce_state).run(keys_only=True)))
1598 db.delete(keys, config=config)
1601 def schedule(cls, base_path, mapreduce_spec):
1602 """Schedule finalize task.
1605 mapreduce_spec: mapreduce specification as MapreduceSpec.
1607 task_name = mapreduce_spec.mapreduce_id + "-finalize"
1608 finalize_task = taskqueue.Task(
1610 url=base_path + "/finalizejob_callback",
1611 params={"mapreduce_id": mapreduce_spec.mapreduce_id},
1612 headers=util._get_task_headers(mapreduce_spec))
1613 queue_name = util.get_queue_name(None)
1614 if not _run_task_hook(mapreduce_spec.get_hooks(),
1615 "enqueue_controller_task",
1619 finalize_task.add(queue_name)
1620 except (taskqueue.TombstonedTaskError,
1621 taskqueue.TaskAlreadyExistsError), e:
1622 logging.warning("Task %r already exists. %s: %s",
1623 task_name, e.__class__, e)
1626 class CleanUpJobHandler(base_handler.PostJsonHandler):
1627 """Command to kick off tasks to clean up a job's data
."""
1630 mapreduce_id = self.request.get("mapreduce_id")
1632 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1634 shard_keys = model.ShardState.calculate_keys_by_mapreduce_state(
1636 db.delete(shard_keys)
1637 db.delete(mapreduce_state)
1638 self.json_response["status"] = ("Job %s successfully cleaned up." %
1642 class AbortJobHandler(base_handler.PostJsonHandler):
1643 """Command to abort a running job
."""
1646 model.MapreduceControl.abort(self.request.get("mapreduce_id"))
1647 self.json_response["status"] = "Abort signal sent."