App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / handlers.py
blob199af678ddb7a9f3c731e2ae6c71b7d9ade07061
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
32 """Defines executor tasks handlers for MapReduce implementation."""
38 import datetime
39 import logging
40 import math
41 import os
42 import random
43 import sys
44 import time
45 import traceback
46 import simplejson
48 from google.appengine.ext import ndb
50 from google.appengine import runtime
51 from google.appengine.api import datastore_errors
52 from google.appengine.api import logservice
53 from google.appengine.api import modules
54 from google.appengine.api import taskqueue
55 from google.appengine.ext import db
56 from google.appengine.ext.mapreduce import base_handler
57 from google.appengine.ext.mapreduce import context
58 from google.appengine.ext.mapreduce import errors
59 from google.appengine.ext.mapreduce import input_readers
60 from google.appengine.ext.mapreduce import model
61 from google.appengine.ext.mapreduce import operation
62 from google.appengine.ext.mapreduce import parameters
63 from google.appengine.ext.mapreduce import util
64 from google.appengine.runtime import apiproxy_errors
67 try:
68 from google.appengine.ext import cloudstorage
73 if hasattr(cloudstorage, "_STUB"):
74 cloudstorage = None
75 except ImportError:
76 cloudstorage = None
88 _TEST_INJECTED_FAULTS = set()
91 def _run_task_hook(hooks, method, task, queue_name):
92 """Invokes hooks.method(task, queue_name).
94 Args:
95 hooks: A hooks.Hooks instance or None.
96 method: The name of the method to invoke on the hooks class e.g.
97 "enqueue_kickoff_task".
98 task: The taskqueue.Task to pass to the hook method.
99 queue_name: The name of the queue to pass to the hook method.
101 Returns:
102 True if the hooks.Hooks instance handled the method, False otherwise.
104 if hooks is not None:
105 try:
106 getattr(hooks, method)(task, queue_name)
107 except NotImplementedError:
109 return False
111 return True
112 return False
115 class MapperWorkerCallbackHandler(base_handler.HugeTaskHandler):
116 """Callback handler for mapreduce worker task."""
118 _TASK_DIRECTIVE = util._enum(
120 PROCEED_TASK="proceed_task",
122 RETRY_TASK="retry_task",
124 DROP_TASK="drop_task",
126 RECOVER_SLICE="recover_slice",
128 RETRY_SHARD="retry_shard",
130 FAIL_TASK="fail_task")
132 def __init__(self, *args):
133 """Constructor."""
134 super(MapperWorkerCallbackHandler, self).__init__(*args)
135 self._time = time.time
137 def _drop_gracefully(self):
138 """Drop worker task gracefully.
140 Set current shard_state to failed. Controller logic will take care of
141 other shards and the entire MR.
143 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
144 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
145 shard_state, mr_state = db.get([
146 model.ShardState.get_key_by_shard_id(shard_id),
147 model.MapreduceState.get_key_by_job_id(mr_id)])
149 if shard_state and shard_state.active:
150 shard_state.set_for_failure()
151 config = util.create_datastore_write_config(mr_state.mapreduce_spec)
152 shard_state.put(config=config)
154 def _try_acquire_lease(self, shard_state, tstate):
155 """Validate datastore and the task payload are consistent.
157 If so, attempt to get a lease on this slice's execution.
158 See model.ShardState doc on slice_start_time.
160 Args:
161 shard_state: model.ShardState from datastore.
162 tstate: model.TransientShardState from taskqueue paylod.
164 Returns:
165 A _TASK_DIRECTIVE enum. PROCEED_TASK if lock is acquired.
166 RETRY_TASK if task should be retried, DROP_TASK if task should
167 be dropped. Only old tasks (comparing to datastore state)
168 will be dropped. Future tasks are retried until they naturally
169 become old so that we don't ever stuck MR.
172 if not shard_state:
173 logging.warning("State not found for shard %s; Possible spurious task "
174 "execution. Dropping this task.",
175 tstate.shard_id)
176 return self._TASK_DIRECTIVE.DROP_TASK
178 if not shard_state.active:
179 logging.warning("Shard %s is not active. Possible spurious task "
180 "execution. Dropping this task.", tstate.shard_id)
181 logging.warning(str(shard_state))
182 return self._TASK_DIRECTIVE.DROP_TASK
185 if shard_state.retries > tstate.retries:
186 logging.warning(
187 "Got shard %s from previous shard retry %s. Possible spurious "
188 "task execution. Dropping this task.",
189 tstate.shard_id,
190 tstate.retries)
191 logging.warning(str(shard_state))
192 return self._TASK_DIRECTIVE.DROP_TASK
193 elif shard_state.retries < tstate.retries:
197 logging.warning(
198 "ShardState for %s is behind slice. Waiting for it to catch up",
199 shard_state.shard_id)
200 return self._TASK_DIRECTIVE.RETRY_TASK
204 if shard_state.slice_id > tstate.slice_id:
205 logging.warning(
206 "Task %s-%s is behind ShardState %s. Dropping task.""",
207 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
208 return self._TASK_DIRECTIVE.DROP_TASK
212 elif shard_state.slice_id < tstate.slice_id:
213 logging.warning(
214 "Task %s-%s is ahead of ShardState %s. Waiting for it to catch up.",
215 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
216 return self._TASK_DIRECTIVE.RETRY_TASK
220 if shard_state.slice_start_time:
221 countdown = self._wait_time(shard_state,
222 parameters.config._LEASE_GRACE_PERIOD +
223 parameters.config._SLICE_DURATION_SEC)
224 if countdown > 0:
225 logging.warning(
226 "Last retry of slice %s-%s may be still running."
227 "Will try again in %s seconds", tstate.shard_id, tstate.slice_id,
228 countdown)
232 time.sleep(countdown)
233 return self._TASK_DIRECTIVE.RETRY_TASK
235 else:
236 if self._wait_time(shard_state,
237 parameters.config._REQUEST_EVENTUAL_TIMEOUT):
238 if not self._has_old_request_ended(shard_state):
239 logging.warning(
240 "Last retry of slice %s-%s is still in flight with request_id "
241 "%s. Will try again later.", tstate.shard_id, tstate.slice_id,
242 shard_state.slice_request_id)
243 return self._TASK_DIRECTIVE.RETRY_TASK
244 else:
245 logging.warning(
246 "Last retry of slice %s-%s has no log entry and has"
247 "timed out after %s seconds",
248 tstate.shard_id, tstate.slice_id,
249 parameters.config._REQUEST_EVENTUAL_TIMEOUT)
252 config = util.create_datastore_write_config(tstate.mapreduce_spec)
253 @db.transactional(retries=5)
254 def _tx():
255 """Use datastore to set slice_start_time to now.
257 If failed for any reason, raise error to retry the task (hence all
258 the previous validation code). The task would die naturally eventually.
260 Returns:
261 A _TASK_DIRECTIVE enum.
263 fresh_state = model.ShardState.get_by_shard_id(tstate.shard_id)
264 if not fresh_state:
265 logging.warning("ShardState missing.")
266 raise db.Rollback()
267 if (fresh_state.active and
268 fresh_state.slice_id == shard_state.slice_id and
269 fresh_state.slice_start_time == shard_state.slice_start_time):
270 shard_state.slice_start_time = datetime.datetime.now()
271 shard_state.slice_request_id = os.environ.get("REQUEST_LOG_ID")
272 shard_state.acquired_once = True
273 shard_state.put(config=config)
274 return self._TASK_DIRECTIVE.PROCEED_TASK
275 else:
276 logging.warning(
277 "Contention on slice %s-%s execution. Will retry again.",
278 tstate.shard_id, tstate.slice_id)
280 time.sleep(random.randrange(1, 5))
281 return self._TASK_DIRECTIVE.RETRY_TASK
283 return _tx()
285 def _has_old_request_ended(self, shard_state):
286 """Whether previous slice retry has ended according to Logs API.
288 Args:
289 shard_state: shard state.
291 Returns:
292 True if the request of previous slice retry has ended. False if it has
293 not or unknown.
295 assert shard_state.slice_start_time is not None
296 assert shard_state.slice_request_id is not None
297 request_ids = [shard_state.slice_request_id]
298 logs = list(logservice.fetch(
299 request_ids=request_ids,
301 module_versions=[(os.environ["CURRENT_MODULE_ID"],
302 modules.get_current_version_name())]))
304 if not logs or not logs[0].finished:
305 return False
306 return True
308 def _wait_time(self, shard_state, secs, now=datetime.datetime.now):
309 """Time to wait until slice_start_time is secs ago from now.
311 Args:
312 shard_state: shard state.
313 secs: duration in seconds.
314 now: a func that gets now.
316 Returns:
317 0 if no wait. A positive int in seconds otherwise. Always around up.
319 assert shard_state.slice_start_time is not None
320 delta = now() - shard_state.slice_start_time
321 duration = datetime.timedelta(seconds=secs)
322 if delta < duration:
323 return util.total_seconds(duration - delta)
324 else:
325 return 0
327 def _try_free_lease(self, shard_state, slice_retry=False):
328 """Try to free lease.
330 A lightweight transaction to update shard_state and unset
331 slice_start_time to allow the next retry to happen without blocking.
332 We don't care if this fails or not because the lease will expire
333 anyway.
335 Under normal execution, _save_state_and_schedule_next is the exit point.
336 It updates/saves shard state and schedules the next slice or returns.
337 Other exit points are:
338 1. _are_states_consistent: at the beginning of handle, checks
339 if datastore states and the task are in sync.
340 If not, raise or return.
341 2. _attempt_slice_retry: may raise exception to taskqueue.
342 3. _save_state_and_schedule_next: may raise exception when taskqueue/db
343 unreachable.
345 This handler should try to free the lease on every exceptional exit point.
347 Args:
348 shard_state: model.ShardState.
349 slice_retry: whether to count this as a failed slice execution.
351 @db.transactional
352 def _tx():
353 fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
354 if fresh_state and fresh_state.active:
356 fresh_state.slice_start_time = None
357 fresh_state.slice_request_id = None
358 if slice_retry:
359 fresh_state.slice_retries += 1
360 fresh_state.put()
361 try:
362 _tx()
364 except Exception, e:
365 logging.warning(e)
366 logging.warning(
367 "Release lock for shard %s failed. Wait for lease to expire.",
368 shard_state.shard_id)
370 def handle(self):
371 """Handle request.
373 This method has to be careful to pass the same ShardState instance to
374 its subroutines calls if the calls mutate or read from ShardState.
375 Note especially that Context instance caches and updates the ShardState
376 instance.
378 Returns:
379 Set HTTP status code and always returns None.
382 self._start_time = self._time()
383 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
384 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
385 spec = model.MapreduceSpec._get_mapreduce_spec(mr_id)
386 shard_state, control = db.get([
387 model.ShardState.get_key_by_shard_id(shard_id),
388 model.MapreduceControl.get_key_by_job_id(mr_id),
392 ctx = context.Context(spec, shard_state,
393 task_retry_count=self.task_retry_count())
394 context.Context._set(ctx)
397 tstate = model.TransientShardState.from_request(self.request)
400 if shard_state:
401 is_this_a_retry = shard_state.acquired_once
402 task_directive = self._try_acquire_lease(shard_state, tstate)
403 if task_directive == self._TASK_DIRECTIVE.RETRY_TASK:
404 return self.retry_task()
405 if task_directive == self._TASK_DIRECTIVE.DROP_TASK:
406 return
407 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
410 if control and control.command == model.MapreduceControl.ABORT:
411 logging.info("Abort command received by shard %d of job '%s'",
412 shard_state.shard_number, shard_state.mapreduce_id)
415 shard_state.set_for_abort()
416 shard_state.put(config=util.create_datastore_write_config(spec))
417 return
421 util._set_ndb_cache_policy()
422 if cloudstorage:
423 cloudstorage.set_default_retry_params(
424 cloudstorage.RetryParams(
425 urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC))
427 try:
428 if is_this_a_retry:
429 task_directive = self._attempt_slice_recovery(shard_state, tstate)
431 if task_directive == self._TASK_DIRECTIVE.PROCEED_TASK:
432 finished_shard = self._process_inputs(
433 tstate.input_reader, shard_state, tstate, ctx)
434 if finished_shard:
437 if tstate.output_writer:
442 tstate.output_writer.finalize(ctx, shard_state)
443 shard_state.set_for_success()
445 except Exception, e:
446 logging.warning("Shard %s got error.", shard_state.shard_id)
447 logging.error(traceback.format_exc())
450 if type(e) is errors.FailJobError:
451 logging.error("Got FailJobError.")
452 task_directive = self._TASK_DIRECTIVE.FAIL_TASK
453 else:
454 task_directive = self._TASK_DIRECTIVE.RETRY_TASK
456 task_directive = self._set_state(shard_state, tstate, task_directive)
457 self._save_state_and_schedule_next(shard_state, tstate, task_directive)
459 def _process_inputs(self,
460 input_reader,
461 shard_state,
462 tstate,
463 ctx):
464 """Read inputs, process them, and write out outputs.
466 This is the core logic of MapReduce. It reads inputs from input reader,
467 invokes user specified mapper function, and writes output with
468 output writer. It also updates shard_state accordingly.
469 e.g. if shard processing is done, set shard_state.active to False.
471 If errors.FailJobError is caught, it will fail this MR job.
472 All other exceptions will be logged and raised to taskqueue for retry
473 until the number of retries exceeds a limit.
475 Args:
476 input_reader: input reader.
477 shard_state: shard state.
478 tstate: transient shard state.
479 ctx: mapreduce context.
481 Returns:
482 Whether this shard has finished processing all its input split.
484 processing_limit = self._processing_limit(tstate.mapreduce_spec)
485 if processing_limit == 0:
486 return
488 finished_shard = True
490 iterator = iter(input_reader)
492 while True:
493 try:
494 entity = iterator.next()
495 except StopIteration:
496 break
508 if isinstance(entity, db.Model):
509 shard_state.last_work_item = repr(entity.key())
510 elif isinstance(entity, ndb.Model):
511 shard_state.last_work_item = repr(entity.key)
512 else:
513 shard_state.last_work_item = repr(entity)[:100]
515 processing_limit -= 1
517 if not self._process_datum(
518 entity, input_reader, ctx, tstate):
519 finished_shard = False
520 break
521 elif processing_limit == 0:
522 finished_shard = False
523 break
526 operation.counters.Increment(
527 context.COUNTER_MAPPER_WALLTIME_MS,
528 int((self._time() - self._start_time)*1000))(ctx)
529 ctx.flush()
531 return finished_shard
533 def _process_datum(self, data, input_reader, ctx, transient_shard_state):
534 """Process a single data piece.
536 Call mapper handler on the data.
538 Args:
539 data: a datum to process.
540 input_reader: input reader.
541 ctx: mapreduce context
542 transient_shard_state: transient shard state.
544 Returns:
545 True if scan should be continued, False if scan should be stopped.
547 if data is not input_readers.ALLOW_CHECKPOINT:
548 ctx.counters.increment(context.COUNTER_MAPPER_CALLS)
550 handler = transient_shard_state.handler
552 if input_reader.expand_parameters:
553 result = handler(*data)
554 else:
555 result = handler(data)
557 if util.is_generator(result):
558 for output in result:
559 if isinstance(output, operation.Operation):
560 output(ctx)
561 else:
562 output_writer = transient_shard_state.output_writer
563 if not output_writer:
564 logging.warning(
565 "Handler yielded %s, but no output writer is set.", output)
566 else:
567 output_writer.write(output)
569 if self._time() - self._start_time >= parameters.config._SLICE_DURATION_SEC:
570 return False
571 return True
573 def _set_state(self, shard_state, tstate, task_directive):
574 """Set shard_state and tstate based on task_directive.
576 Args:
577 shard_state: model.ShardState for current shard.
578 tstate: model.TransientShardState for current shard.
579 task_directive: self._TASK_DIRECTIVE for current shard.
581 Returns:
582 A _TASK_DIRECTIVE enum. PROCEED_TASK if task should proceed normally.
583 RETRY_SHARD if shard should be retried. RETRY_TASK if slice should be
584 retried. FAIL_TASK if task should. RECOVER_SLICE if slice should be
585 recovered.
587 if task_directive == self._TASK_DIRECTIVE.PROCEED_TASK:
588 shard_state.advance_for_next_slice()
589 tstate.advance_for_next_slice()
590 return task_directive
592 if task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
593 tstate.advance_for_next_slice(recovery_slice=True)
594 shard_state.advance_for_next_slice(recovery_slice=True)
595 return task_directive
597 if task_directive == self._TASK_DIRECTIVE.RETRY_TASK:
598 task_directive = self._attempt_slice_retry(shard_state, tstate)
599 if task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
600 task_directive = self._attempt_shard_retry(shard_state, tstate)
601 if task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
602 shard_state.set_for_failure()
603 return task_directive
605 def _save_state_and_schedule_next(self, shard_state, tstate, task_directive):
606 """Save state and schedule task.
608 Save shard state to datastore.
609 Schedule next slice if needed.
610 Set HTTP response code.
611 No modification to any shard_state or tstate.
613 Args:
614 shard_state: model.ShardState for current shard.
615 tstate: model.TransientShardState for current shard.
616 task_directive: enum _TASK_DIRECTIVE.
618 spec = tstate.mapreduce_spec
620 if task_directive == self._TASK_DIRECTIVE.RETRY_TASK:
622 return self.retry_task()
623 elif task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
624 logging.critical("Shard %s failed permanently.", shard_state.shard_id)
625 task = None
626 elif task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
627 logging.warning("Shard %s is going to be attempted for the %s time.",
628 shard_state.shard_id,
629 shard_state.retries + 1)
630 task = self._state_to_task(tstate, shard_state)
631 elif task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
632 logging.warning("Shard %s slice %s is being recovered.",
633 shard_state.shard_id,
634 shard_state.slice_id)
635 task = self._state_to_task(tstate, shard_state)
636 else:
637 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
638 countdown = self._get_countdown_for_next_slice(spec)
639 task = self._state_to_task(tstate, shard_state, countdown=countdown)
642 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
645 "default")
646 config = util.create_datastore_write_config(spec)
648 @db.transactional(retries=5)
649 def _tx():
650 fresh_shard_state = model.ShardState.get_by_shard_id(tstate.shard_id)
651 if not fresh_shard_state:
652 raise db.Rollback()
653 if (not fresh_shard_state.active or
654 "worker_active_state_collision" in _TEST_INJECTED_FAULTS):
655 logging.warning("Shard %s is not active. Possible spurious task "
656 "execution. Dropping this task.", tstate.shard_id)
657 logging.warning("Datastore's %s", str(fresh_shard_state))
658 logging.warning("Slice's %s", str(shard_state))
659 return
660 fresh_shard_state.copy_from(shard_state)
661 fresh_shard_state.put(config=config)
666 if fresh_shard_state.active:
669 self._add_task(task, spec, queue_name)
671 try:
672 _tx()
673 except (datastore_errors.Error,
674 taskqueue.Error,
675 runtime.DeadlineExceededError,
676 apiproxy_errors.Error), e:
677 logging.warning(
678 "Can't transactionally continue shard. "
679 "Will retry slice %s %s for the %s time.",
680 tstate.shard_id,
681 tstate.slice_id,
682 self.task_retry_count() + 1)
683 self._try_free_lease(shard_state)
684 raise e
686 def _attempt_slice_recovery(self, shard_state, tstate):
687 """Recover a slice.
689 This is run when a slice had been previously attempted and output
690 may have been written. If an output writer requires slice recovery,
691 we run those logic to remove output duplicates. Otherwise we just retry
692 the slice.
694 If recovery is needed, then the entire slice will be dedicated
695 to recovery logic. No data processing will take place. Thus we call
696 the slice "recovery slice". This is needed for correctness:
697 An output writer instance can be out of sync from its physical
698 medium only when the slice dies after acquring the shard lock but before
699 committing shard state to db. The worst failure case is when
700 shard state failed to commit after the NAMED task for the next slice was
701 added. Thus, recovery slice has a special logic to increment current
702 slice_id n to n+2. If the task for n+1 had been added, it will be dropped
703 because it is behind shard state.
705 Args:
706 shard_state: an instance of Model.ShardState.
707 tstate: an instance of Model.TransientShardState.
709 Returns:
710 _TASK_DIRECTIVE.PROCEED_TASK to continue with this retry.
711 _TASK_DIRECTIVE.RECOVER_SLICE to recover this slice.
712 The next slice will start at the same input as
713 this slice but output to a new instance of output writer.
714 Combining outputs from all writer instances is up to implementation.
716 mapper_spec = tstate.mapreduce_spec.mapper
717 if not (tstate.output_writer and
718 tstate.output_writer._supports_slice_recovery(mapper_spec)):
719 return self._TASK_DIRECTIVE.PROCEED_TASK
721 tstate.output_writer = tstate.output_writer._recover(
722 tstate.mapreduce_spec, shard_state.shard_number,
723 shard_state.retries + 1)
724 return self._TASK_DIRECTIVE.RECOVER_SLICE
726 def _attempt_shard_retry(self, shard_state, tstate):
727 """Whether to retry shard.
729 This method may modify shard_state and tstate to prepare for retry or fail.
731 Args:
732 shard_state: model.ShardState for current shard.
733 tstate: model.TransientShardState for current shard.
735 Returns:
736 A _TASK_DIRECTIVE enum. RETRY_SHARD if shard should be retried.
737 FAIL_TASK otherwise.
739 shard_attempts = shard_state.retries + 1
741 if shard_attempts >= parameters.config.SHARD_MAX_ATTEMPTS:
742 logging.warning(
743 "Shard attempt %s exceeded %s max attempts.",
744 shard_attempts, parameters.config.SHARD_MAX_ATTEMPTS)
745 return self._TASK_DIRECTIVE.FAIL_TASK
746 if tstate.output_writer and (
747 not tstate.output_writer._supports_shard_retry(tstate)):
748 logging.warning("Output writer %s does not support shard retry.",
749 tstate.output_writer.__class__.__name__)
750 return self._TASK_DIRECTIVE.FAIL_TASK
752 shard_state.reset_for_retry()
753 logging.warning("Shard %s attempt %s failed with up to %s attempts.",
754 shard_state.shard_id,
755 shard_state.retries,
756 parameters.config.SHARD_MAX_ATTEMPTS)
757 output_writer = None
758 if tstate.output_writer:
759 output_writer = tstate.output_writer.create(
760 tstate.mapreduce_spec, shard_state.shard_number, shard_attempts + 1)
761 tstate.reset_for_retry(output_writer)
762 return self._TASK_DIRECTIVE.RETRY_SHARD
764 def _attempt_slice_retry(self, shard_state, tstate):
765 """Attempt to retry this slice.
767 This method may modify shard_state and tstate to prepare for retry or fail.
769 Args:
770 shard_state: model.ShardState for current shard.
771 tstate: model.TransientShardState for current shard.
773 Returns:
774 A _TASK_DIRECTIVE enum. RETRY_TASK if slice should be retried.
775 RETRY_SHARD if shard retry should be attempted.
777 if (shard_state.slice_retries + 1 <
778 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS):
779 logging.warning(
780 "Slice %s %s failed for the %s of up to %s attempts "
781 "(%s of %s taskqueue execution attempts). "
782 "Will retry now.",
783 tstate.shard_id,
784 tstate.slice_id,
785 shard_state.slice_retries + 1,
786 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS,
787 self.task_retry_count() + 1,
788 parameters.config.TASK_MAX_ATTEMPTS)
792 sys.exc_clear()
793 self._try_free_lease(shard_state, slice_retry=True)
794 return self._TASK_DIRECTIVE.RETRY_TASK
796 if parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS > 0:
797 logging.warning("Slice attempt %s exceeded %s max attempts.",
798 self.task_retry_count() + 1,
799 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)
800 return self._TASK_DIRECTIVE.RETRY_SHARD
802 @staticmethod
803 def get_task_name(shard_id, slice_id, retry=0):
804 """Compute single worker task name.
806 Args:
807 shard_id: shard id.
808 slice_id: slice id.
809 retry: current shard retry count.
811 Returns:
812 task name which should be used to process specified shard/slice.
816 return "appengine-mrshard-%s-%s-retry-%s" % (
817 shard_id, slice_id, retry)
819 def _get_countdown_for_next_slice(self, spec):
820 """Get countdown for next slice's task.
822 When user sets processing rate, we set countdown to delay task execution.
824 Args:
825 spec: model.MapreduceSpec
827 Returns:
828 countdown in int.
830 countdown = 0
831 if self._processing_limit(spec) != -1:
832 countdown = max(
833 int(parameters.config._SLICE_DURATION_SEC -
834 (self._time() - self._start_time)), 0)
835 return countdown
837 @classmethod
838 def _state_to_task(cls,
839 tstate,
840 shard_state,
841 eta=None,
842 countdown=None):
843 """Generate task for slice according to current states.
845 Args:
846 tstate: An instance of TransientShardState.
847 shard_state: An instance of ShardState.
848 eta: Absolute time when the MR should execute. May not be specified
849 if 'countdown' is also supplied. This may be timezone-aware or
850 timezone-naive.
851 countdown: Time in seconds into the future that this MR should execute.
852 Defaults to zero.
854 Returns:
855 A model.HugeTask instance for the slice specified by current states.
857 base_path = tstate.base_path
859 task_name = MapperWorkerCallbackHandler.get_task_name(
860 tstate.shard_id,
861 tstate.slice_id,
862 tstate.retries)
864 headers = util._get_task_headers(tstate.mapreduce_spec)
865 headers[util._MR_SHARD_ID_TASK_HEADER] = tstate.shard_id
867 worker_task = model.HugeTask(
868 url=base_path + "/worker_callback/" + tstate.shard_id,
869 params=tstate.to_dict(),
870 name=task_name,
871 eta=eta,
872 countdown=countdown,
873 parent=shard_state,
874 headers=headers)
875 return worker_task
877 @classmethod
878 def _add_task(cls,
879 worker_task,
880 mapreduce_spec,
881 queue_name):
882 """Schedule slice scanning by adding it to the task queue.
884 Args:
885 worker_task: a model.HugeTask task for slice. This is NOT a taskqueue
886 task.
887 mapreduce_spec: an instance of model.MapreduceSpec.
888 queue_name: Optional queue to run on; uses the current queue of
889 execution or the default queue if unspecified.
891 if not _run_task_hook(mapreduce_spec.get_hooks(),
892 "enqueue_worker_task",
893 worker_task,
894 queue_name):
895 try:
898 worker_task.add(queue_name)
899 except (taskqueue.TombstonedTaskError,
900 taskqueue.TaskAlreadyExistsError), e:
901 logging.warning("Task %r already exists. %s: %s",
902 worker_task.name,
903 e.__class__,
906 def _processing_limit(self, spec):
907 """Get the limit on the number of map calls allowed by this slice.
909 Args:
910 spec: a Mapreduce spec.
912 Returns:
913 The limit as a positive int if specified by user. -1 otherwise.
915 processing_rate = float(spec.mapper.params.get("processing_rate", 0))
916 slice_processing_limit = -1
917 if processing_rate > 0:
918 slice_processing_limit = int(math.ceil(
919 parameters.config._SLICE_DURATION_SEC*processing_rate/
920 int(spec.mapper.shard_count)))
921 return slice_processing_limit
925 @classmethod
926 def _schedule_slice(cls,
927 shard_state,
928 tstate,
929 queue_name=None,
930 eta=None,
931 countdown=None):
932 """Schedule slice scanning by adding it to the task queue.
934 Args:
935 shard_state: An instance of ShardState.
936 tstate: An instance of TransientShardState.
937 queue_name: Optional queue to run on; uses the current queue of
938 execution or the default queue if unspecified.
939 eta: Absolute time when the MR should execute. May not be specified
940 if 'countdown' is also supplied. This may be timezone-aware or
941 timezone-naive.
942 countdown: Time in seconds into the future that this MR should execute.
943 Defaults to zero.
945 queue_name = queue_name or os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
946 "default")
947 task = cls._state_to_task(tstate, shard_state, eta, countdown)
948 cls._add_task(task, tstate.mapreduce_spec, queue_name)
951 class ControllerCallbackHandler(base_handler.HugeTaskHandler):
952 """Supervises mapreduce execution.
954 Is also responsible for gathering execution status from shards together.
956 This task is "continuously" running by adding itself again to taskqueue if
957 and only if mapreduce is still active. A mapreduce is active if it has
958 actively running shards.
961 def __init__(self, *args):
962 """Constructor."""
963 super(ControllerCallbackHandler, self).__init__(*args)
964 self._time = time.time
966 def _drop_gracefully(self):
967 """Gracefully drop controller task.
969 This method is called when decoding controller task payload failed.
970 Upon this we mark ShardState and MapreduceState as failed so all
971 tasks can stop.
973 Writing to datastore is forced (ignore read-only mode) because we
974 want the tasks to stop badly, and if force_writes was False,
975 the job would have never been started.
977 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
978 state = model.MapreduceState.get_by_job_id(mr_id)
979 if not state or not state.active:
980 return
982 state.active = False
983 state.result_status = model.MapreduceState.RESULT_FAILED
984 config = util.create_datastore_write_config(state.mapreduce_spec)
985 puts = []
986 for ss in model.ShardState.find_all_by_mapreduce_state(state):
987 if ss.active:
988 ss.set_for_failure()
989 puts.append(ss)
991 if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
992 db.put(puts, config=config)
993 puts = []
994 db.put(puts, config=config)
996 db.put(state, config=config)
998 def handle(self):
999 """Handle request."""
1000 spec = model.MapreduceSpec.from_json_str(
1001 self.request.get("mapreduce_spec"))
1002 state, control = db.get([
1003 model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
1004 model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
1007 if not state:
1008 logging.warning("State not found for MR '%s'; dropping controller task.",
1009 spec.mapreduce_id)
1010 return
1011 if not state.active:
1012 logging.warning(
1013 "MR %r is not active. Looks like spurious controller task execution.",
1014 spec.mapreduce_id)
1015 self._clean_up_mr(spec)
1016 return
1018 shard_states = model.ShardState.find_all_by_mapreduce_state(state)
1019 self._update_state_from_shard_states(state, shard_states, control)
1021 if state.active:
1022 ControllerCallbackHandler.reschedule(
1023 state, spec, self.serial_id() + 1)
1025 def _update_state_from_shard_states(self, state, shard_states, control):
1026 """Update mr state by examing shard states.
1028 Args:
1029 state: current mapreduce state as MapreduceState.
1030 shard_states: an iterator over shard states.
1031 control: model.MapreduceControl entity.
1034 state.active_shards, state.aborted_shards, state.failed_shards = 0, 0, 0
1035 total_shards = 0
1036 processed_counts = []
1037 state.counters_map.clear()
1040 for s in shard_states:
1041 total_shards += 1
1042 if s.active:
1043 state.active_shards += 1
1044 if s.result_status == model.ShardState.RESULT_ABORTED:
1045 state.aborted_shards += 1
1046 elif s.result_status == model.ShardState.RESULT_FAILED:
1047 state.failed_shards += 1
1050 state.counters_map.add_map(s.counters_map)
1051 processed_counts.append(s.counters_map.get(context.COUNTER_MAPPER_CALLS))
1053 state.set_processed_counts(processed_counts)
1054 state.last_poll_time = datetime.datetime.utcfromtimestamp(self._time())
1056 spec = state.mapreduce_spec
1058 if total_shards != spec.mapper.shard_count:
1059 logging.error("Found %d shard states. Expect %d. "
1060 "Issuing abort command to job '%s'",
1061 total_shards, spec.mapper.shard_count,
1062 spec.mapreduce_id)
1064 model.MapreduceControl.abort(spec.mapreduce_id)
1068 state.active = bool(state.active_shards)
1069 if not control and (state.failed_shards or state.aborted_shards):
1071 model.MapreduceControl.abort(spec.mapreduce_id)
1073 if not state.active:
1075 if state.failed_shards or not total_shards:
1076 state.result_status = model.MapreduceState.RESULT_FAILED
1079 elif state.aborted_shards:
1080 state.result_status = model.MapreduceState.RESULT_ABORTED
1081 else:
1082 state.result_status = model.MapreduceState.RESULT_SUCCESS
1083 self._finalize_outputs(spec, state)
1084 self._finalize_job(spec, state)
1085 else:
1086 @db.transactional(retries=5)
1087 def _put_state():
1088 fresh_state = model.MapreduceState.get_by_job_id(spec.mapreduce_id)
1091 if not fresh_state.active:
1092 logging.warning(
1093 "Job %s is not active. Looks like spurious task execution. "
1094 "Dropping controller task.", spec.mapreduce_id)
1095 return
1096 config = util.create_datastore_write_config(spec)
1097 state.put(config=config)
1099 _put_state()
1101 def serial_id(self):
1102 """Get serial unique identifier of this task from request.
1104 Returns:
1105 serial identifier as int.
1107 return int(self.request.get("serial_id"))
1109 @classmethod
1110 def _finalize_outputs(cls, mapreduce_spec, mapreduce_state):
1111 """Finalize outputs.
1113 Args:
1114 mapreduce_spec: an instance of MapreduceSpec.
1115 mapreduce_state: an instance of MapreduceState.
1118 if (mapreduce_spec.mapper.output_writer_class() and
1119 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
1120 mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
1122 @classmethod
1123 def _finalize_job(cls, mapreduce_spec, mapreduce_state):
1124 """Finalize job execution.
1126 Invokes done callback and save mapreduce state in a transaction,
1127 and schedule necessary clean ups. This method is idempotent.
1129 Args:
1130 mapreduce_spec: an instance of MapreduceSpec
1131 mapreduce_state: an instance of MapreduceState
1133 config = util.create_datastore_write_config(mapreduce_spec)
1134 queue_name = util.get_queue_name(mapreduce_spec.params.get(
1135 model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
1136 done_callback = mapreduce_spec.params.get(
1137 model.MapreduceSpec.PARAM_DONE_CALLBACK)
1138 done_task = None
1139 if done_callback:
1140 done_task = taskqueue.Task(
1141 url=done_callback,
1142 headers=util._get_task_headers(mapreduce_spec,
1143 util.CALLBACK_MR_ID_TASK_HEADER),
1144 method=mapreduce_spec.params.get("done_callback_method", "POST"))
1146 @db.transactional(retries=5)
1147 def _put_state():
1148 fresh_state = model.MapreduceState.get_by_job_id(
1149 mapreduce_spec.mapreduce_id)
1150 if not fresh_state.active:
1151 logging.warning(
1152 "Job %s is not active. Looks like spurious task execution. "
1153 "Dropping task.", mapreduce_spec.mapreduce_id)
1154 return
1155 mapreduce_state.put(config=config)
1157 if done_task and not _run_task_hook(
1158 mapreduce_spec.get_hooks(),
1159 "enqueue_done_task",
1160 done_task,
1161 queue_name):
1162 done_task.add(queue_name, transactional=True)
1164 _put_state()
1165 logging.info("Final result for job '%s' is '%s'",
1166 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
1167 cls._clean_up_mr(mapreduce_spec)
1169 @classmethod
1170 def _clean_up_mr(cls, mapreduce_spec):
1171 FinalizeJobHandler.schedule(mapreduce_spec)
1173 @staticmethod
1174 def get_task_name(mapreduce_spec, serial_id):
1175 """Compute single controller task name.
1177 Args:
1178 transient_shard_state: an instance of TransientShardState.
1180 Returns:
1181 task name which should be used to process specified shard/slice.
1185 return "appengine-mrcontrol-%s-%s" % (
1186 mapreduce_spec.mapreduce_id, serial_id)
1188 @staticmethod
1189 def controller_parameters(mapreduce_spec, serial_id):
1190 """Fill in controller task parameters.
1192 Returned parameters map is to be used as task payload, and it contains
1193 all the data, required by controller to perform its function.
1195 Args:
1196 mapreduce_spec: specification of the mapreduce.
1197 serial_id: id of the invocation as int.
1199 Returns:
1200 string->string map of parameters to be used as task payload.
1202 return {"mapreduce_spec": mapreduce_spec.to_json_str(),
1203 "serial_id": str(serial_id)}
1205 @classmethod
1206 def reschedule(cls,
1207 mapreduce_state,
1208 mapreduce_spec,
1209 serial_id,
1210 queue_name=None):
1211 """Schedule new update status callback task.
1213 Args:
1214 mapreduce_state: mapreduce state as model.MapreduceState
1215 mapreduce_spec: mapreduce specification as MapreduceSpec.
1216 serial_id: id of the invocation as int.
1217 queue_name: The queue to schedule this task on. Will use the current
1218 queue of execution if not supplied.
1220 task_name = ControllerCallbackHandler.get_task_name(
1221 mapreduce_spec, serial_id)
1222 task_params = ControllerCallbackHandler.controller_parameters(
1223 mapreduce_spec, serial_id)
1224 if not queue_name:
1225 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
1227 controller_callback_task = model.HugeTask(
1228 url=(mapreduce_spec.params["base_path"] + "/controller_callback/" +
1229 mapreduce_spec.mapreduce_id),
1230 name=task_name, params=task_params,
1231 countdown=parameters.config._CONTROLLER_PERIOD_SEC,
1232 parent=mapreduce_state,
1233 headers=util._get_task_headers(mapreduce_spec))
1235 if not _run_task_hook(mapreduce_spec.get_hooks(),
1236 "enqueue_controller_task",
1237 controller_callback_task,
1238 queue_name):
1239 try:
1240 controller_callback_task.add(queue_name)
1241 except (taskqueue.TombstonedTaskError,
1242 taskqueue.TaskAlreadyExistsError), e:
1243 logging.warning("Task %r with params %r already exists. %s: %s",
1244 task_name, task_params, e.__class__, e)
1247 class KickOffJobHandler(base_handler.TaskQueueHandler):
1248 """Taskqueue handler which kicks off a mapreduce processing.
1250 This handler is idempotent.
1252 Precondition:
1253 The Model.MapreduceState entity for this mr is already created and
1254 saved to datastore by StartJobHandler._start_map.
1256 Request Parameters:
1257 mapreduce_id: in string.
1261 _SERIALIZED_INPUT_READERS_KEY = "input_readers_for_mr_%s"
1263 def handle(self):
1264 """Handles kick off request."""
1266 mr_id = self.request.get("mapreduce_id")
1268 logging.info("Processing kickoff for job %s", mr_id)
1269 state = model.MapreduceState.get_by_job_id(mr_id)
1270 if not self._check_mr_state(state, mr_id):
1271 return
1274 readers, serialized_readers_entity = self._get_input_readers(state)
1275 if readers is None:
1277 logging.warning("Found no mapper input data to process.")
1278 state.active = False
1279 state.result_status = model.MapreduceState.RESULT_SUCCESS
1280 ControllerCallbackHandler._finalize_job(
1281 state.mapreduce_spec, state)
1282 return False
1285 self._setup_output_writer(state)
1289 result = self._save_states(state, serialized_readers_entity)
1290 if result is None:
1291 readers, _ = self._get_input_readers(state)
1292 elif not result:
1293 return
1295 queue_name = self.request.headers.get("X-AppEngine-QueueName")
1296 KickOffJobHandler._schedule_shards(state.mapreduce_spec, readers,
1297 queue_name,
1298 state.mapreduce_spec.params["base_path"],
1299 state)
1301 ControllerCallbackHandler.reschedule(
1302 state, state.mapreduce_spec, serial_id=0, queue_name=queue_name)
1304 def _get_input_readers(self, state):
1305 """Get input readers.
1307 Args:
1308 state: a MapreduceState model.
1310 Returns:
1311 A tuple: (a list of input readers, a model._HugeTaskPayload entity).
1312 The payload entity contains the json serialized input readers.
1313 (None, None) when input reader inplitting returned no data to process.
1315 serialized_input_readers_key = (self._SERIALIZED_INPUT_READERS_KEY %
1316 state.key().id_or_name())
1317 serialized_input_readers = model._HugeTaskPayload.get_by_key_name(
1318 serialized_input_readers_key, parent=state)
1321 input_reader_class = state.mapreduce_spec.mapper.input_reader_class()
1322 if serialized_input_readers is None:
1323 readers = input_reader_class.split_input(
1324 state.mapreduce_spec.mapper)
1325 else:
1326 readers = [input_reader_class.from_json_str(json) for json in
1327 simplejson.loads(serialized_input_readers.payload)]
1329 if not readers:
1330 return None, None
1333 state.mapreduce_spec.mapper.shard_count = len(readers)
1334 state.active_shards = len(readers)
1337 if serialized_input_readers is None:
1339 serialized_input_readers = model._HugeTaskPayload(
1340 key_name=serialized_input_readers_key, parent=state)
1341 readers_json_str = [i.to_json_str() for i in readers]
1342 serialized_input_readers.payload = simplejson.dumps(readers_json_str)
1343 return readers, serialized_input_readers
1345 def _setup_output_writer(self, state):
1346 if not state.writer_state:
1347 output_writer_class = state.mapreduce_spec.mapper.output_writer_class()
1348 if output_writer_class:
1349 output_writer_class.init_job(state)
1351 @db.transactional
1352 def _save_states(self, state, serialized_readers_entity):
1353 """Run transaction to save state.
1355 Args:
1356 state: a model.MapreduceState entity.
1357 serialized_readers_entity: a model._HugeTaskPayload entity containing
1358 json serialized input readers.
1360 Returns:
1361 False if a fatal error is encountered and this task should be dropped
1362 immediately. True if transaction is successful. None if a previous
1363 attempt of this same transaction has already succeeded.
1365 mr_id = state.key().id_or_name()
1366 fresh_state = model.MapreduceState.get_by_job_id(mr_id)
1367 if not self._check_mr_state(fresh_state, mr_id):
1368 return False
1369 if fresh_state.active_shards != 0:
1370 logging.warning(
1371 "Mapreduce %s already has active shards. Looks like spurious task "
1372 "execution.", mr_id)
1373 return None
1374 config = util.create_datastore_write_config(state.mapreduce_spec)
1375 db.put([state, serialized_readers_entity], config=config)
1376 return True
1378 @classmethod
1379 def _schedule_shards(cls,
1380 spec,
1381 readers,
1382 queue_name,
1383 base_path,
1384 mr_state):
1385 """Prepares shard states and schedules their execution.
1387 Even though this method does not schedule shard task and save shard state
1388 transactionally, it's safe for taskqueue to retry this logic because
1389 the initial shard_state for each shard is the same from any retry.
1390 This is an important yet reasonable assumption on model.ShardState.
1392 Args:
1393 spec: mapreduce specification as MapreduceSpec.
1394 readers: list of InputReaders describing shard splits.
1395 queue_name: The queue to run this job on.
1396 base_path: The base url path of mapreduce callbacks.
1397 mr_state: The MapReduceState of current job.
1400 shard_states = []
1401 for shard_number, input_reader in enumerate(readers):
1402 shard_state = model.ShardState.create_new(spec.mapreduce_id, shard_number)
1403 shard_state.shard_description = str(input_reader)
1404 shard_states.append(shard_state)
1407 existing_shard_states = db.get(shard.key() for shard in shard_states)
1408 existing_shard_keys = set(shard.key() for shard in existing_shard_states
1409 if shard is not None)
1413 db.put((shard for shard in shard_states
1414 if shard.key() not in existing_shard_keys),
1415 config=util.create_datastore_write_config(spec))
1418 writer_class = spec.mapper.output_writer_class()
1419 writers = [None] * len(readers)
1420 if writer_class:
1421 for shard_number, shard_state in enumerate(shard_states):
1422 writers[shard_number] = writer_class.create(
1423 mr_state.mapreduce_spec,
1424 shard_state.shard_number, shard_state.retries + 1,
1425 mr_state.writer_state)
1430 for shard_number, (input_reader, output_writer) in enumerate(
1431 zip(readers, writers)):
1432 shard_id = model.ShardState.shard_id_from_number(
1433 spec.mapreduce_id, shard_number)
1434 task = MapperWorkerCallbackHandler._state_to_task(
1435 model.TransientShardState(
1436 base_path, spec, shard_id, 0, input_reader, input_reader,
1437 output_writer=output_writer,
1438 handler=spec.mapper.handler),
1439 shard_states[shard_number])
1440 MapperWorkerCallbackHandler._add_task(task,
1441 spec,
1442 queue_name)
1444 @classmethod
1445 def _check_mr_state(cls, state, mr_id):
1446 """Check MapreduceState.
1448 Args:
1449 state: an MapreduceState instance.
1450 mr_id: mapreduce id.
1452 Returns:
1453 True if state is valid. False if not and this task should be dropped.
1455 if state is None:
1456 logging.warning(
1457 "Mapreduce State for job %s is missing. Dropping Task.",
1458 mr_id)
1459 return False
1460 if not state.active:
1461 logging.warning(
1462 "Mapreduce %s is not active. Looks like spurious task "
1463 "execution. Dropping Task.", mr_id)
1464 return False
1465 return True
1468 class StartJobHandler(base_handler.PostJsonHandler):
1469 """Command handler starts a mapreduce job.
1471 This handler allows user to start a mr via a web form. It's _start_map
1472 method can also be used independently to start a mapreduce.
1475 def handle(self):
1476 """Handles start request."""
1478 mapreduce_name = self._get_required_param("name")
1479 mapper_input_reader_spec = self._get_required_param("mapper_input_reader")
1480 mapper_handler_spec = self._get_required_param("mapper_handler")
1481 mapper_output_writer_spec = self.request.get("mapper_output_writer")
1482 mapper_params = self._get_params(
1483 "mapper_params_validator", "mapper_params.")
1484 params = self._get_params(
1485 "params_validator", "params.")
1486 if "base_path" not in params:
1487 params["base_path"] = parameters.config.BASE_PATH
1490 mapper_params["processing_rate"] = int(mapper_params.get(
1491 "processing_rate") or parameters.config.PROCESSING_RATE_PER_SEC)
1492 queue_name = mapper_params["queue_name"] = util.get_queue_name(
1493 mapper_params.get("queue_name", None))
1496 mapper_spec = model.MapperSpec(
1497 mapper_handler_spec,
1498 mapper_input_reader_spec,
1499 mapper_params,
1500 int(mapper_params.get("shard_count", parameters.config.SHARD_COUNT)),
1501 output_writer_spec=mapper_output_writer_spec)
1503 mapreduce_id = type(self)._start_map(
1504 mapreduce_name,
1505 mapper_spec,
1506 params,
1507 queue_name=queue_name,
1508 _app=mapper_params.get("_app"))
1509 self.json_response["mapreduce_id"] = mapreduce_id
1511 def _get_params(self, validator_parameter, name_prefix):
1512 """Retrieves additional user-supplied params for the job and validates them.
1514 Args:
1515 validator_parameter: name of the request parameter which supplies
1516 validator for this parameter set.
1517 name_prefix: common prefix for all parameter names in the request.
1519 Raises:
1520 Any exception raised by the 'params_validator' request parameter if
1521 the params fail to validate.
1523 params_validator = self.request.get(validator_parameter)
1525 user_params = {}
1526 for key in self.request.arguments():
1527 if key.startswith(name_prefix):
1528 values = self.request.get_all(key)
1529 adjusted_key = key[len(name_prefix):]
1530 if len(values) == 1:
1531 user_params[adjusted_key] = values[0]
1532 else:
1533 user_params[adjusted_key] = values
1535 if params_validator:
1536 resolved_validator = util.for_name(params_validator)
1537 resolved_validator(user_params)
1539 return user_params
1541 def _get_required_param(self, param_name):
1542 """Get a required request parameter.
1544 Args:
1545 param_name: name of request parameter to fetch.
1547 Returns:
1548 parameter value
1550 Raises:
1551 errors.NotEnoughArgumentsError: if parameter is not specified.
1553 value = self.request.get(param_name)
1554 if not value:
1555 raise errors.NotEnoughArgumentsError(param_name + " not specified")
1556 return value
1558 @classmethod
1559 def _start_map(cls,
1560 name,
1561 mapper_spec,
1562 mapreduce_params,
1563 queue_name,
1564 eta=None,
1565 countdown=None,
1566 hooks_class_name=None,
1567 _app=None,
1568 in_xg_transaction=False):
1571 """See control.start_map.
1573 Requirements for this method:
1574 1. The request that invokes this method can either be regular or
1575 from taskqueue. So taskqueue specific headers can not be used.
1576 2. Each invocation transactionally starts an isolated mapreduce job with
1577 a unique id. MapreduceState should be immediately available after
1578 returning. See control.start_map's doc on transactional.
1579 3. Method should be lightweight.
1582 mapper_input_reader_class = mapper_spec.input_reader_class()
1583 mapper_input_reader_class.validate(mapper_spec)
1586 mapper_output_writer_class = mapper_spec.output_writer_class()
1587 if mapper_output_writer_class:
1588 mapper_output_writer_class.validate(mapper_spec)
1591 mapreduce_id = model.MapreduceState.new_mapreduce_id()
1592 mapreduce_spec = model.MapreduceSpec(
1593 name,
1594 mapreduce_id,
1595 mapper_spec.to_json(),
1596 mapreduce_params,
1597 hooks_class_name)
1600 ctx = context.Context(mapreduce_spec, None)
1601 context.Context._set(ctx)
1602 try:
1604 mapper_spec.handler
1605 finally:
1606 context.Context._set(None)
1609 if in_xg_transaction:
1610 propagation = db.MANDATORY
1611 else:
1612 propagation = db.INDEPENDENT
1614 @db.transactional(propagation=propagation)
1615 def _txn():
1616 cls._create_and_save_state(mapreduce_spec, _app)
1617 cls._add_kickoff_task(mapreduce_params["base_path"], mapreduce_spec, eta,
1618 countdown, queue_name)
1619 _txn()
1621 return mapreduce_id
1623 @classmethod
1624 def _create_and_save_state(cls, mapreduce_spec, _app):
1625 """Save mapreduce state to datastore.
1627 Save state to datastore so that UI can see it immediately.
1629 Args:
1630 mapreduce_spec: model.MapreduceSpec,
1631 _app: app id if specified. None otherwise.
1633 Returns:
1634 The saved Mapreduce state.
1636 state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
1637 state.mapreduce_spec = mapreduce_spec
1638 state.active = True
1639 state.active_shards = 0
1640 if _app:
1641 state.app_id = _app
1642 config = util.create_datastore_write_config(mapreduce_spec)
1643 state.put(config=config)
1644 return state
1646 @classmethod
1647 def _add_kickoff_task(cls,
1648 base_path,
1649 mapreduce_spec,
1650 eta,
1651 countdown,
1652 queue_name):
1653 params = {"mapreduce_id": mapreduce_spec.mapreduce_id}
1655 kickoff_task = taskqueue.Task(
1656 url=base_path + "/kickoffjob_callback/" + mapreduce_spec.mapreduce_id,
1657 headers=util._get_task_headers(mapreduce_spec),
1658 params=params,
1659 eta=eta,
1660 countdown=countdown)
1661 hooks = mapreduce_spec.get_hooks()
1662 if hooks is not None:
1663 try:
1664 hooks.enqueue_kickoff_task(kickoff_task, queue_name)
1665 return
1666 except NotImplementedError:
1667 pass
1668 kickoff_task.add(queue_name, transactional=True)
1671 class FinalizeJobHandler(base_handler.TaskQueueHandler):
1672 """Finalize map job by deleting all temporary entities."""
1674 def handle(self):
1675 mapreduce_id = self.request.get("mapreduce_id")
1676 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1677 if mapreduce_state:
1678 config=util.create_datastore_write_config(mapreduce_state.mapreduce_spec)
1679 keys = [model.MapreduceControl.get_key_by_job_id(mapreduce_id)]
1680 for ss in model.ShardState.find_all_by_mapreduce_state(mapreduce_state):
1681 keys.extend(list(
1682 model._HugeTaskPayload.all().ancestor(ss).run(keys_only=True)))
1683 keys.extend(list(model._HugeTaskPayload.all().ancestor(
1684 mapreduce_state).run(keys_only=True)))
1685 db.delete(keys, config=config)
1687 @classmethod
1688 def schedule(cls, mapreduce_spec):
1689 """Schedule finalize task.
1691 Args:
1692 mapreduce_spec: mapreduce specification as MapreduceSpec.
1694 task_name = mapreduce_spec.mapreduce_id + "-finalize"
1695 finalize_task = taskqueue.Task(
1696 name=task_name,
1697 url=(mapreduce_spec.params["base_path"] + "/finalizejob_callback/" +
1698 mapreduce_spec.mapreduce_id),
1699 params={"mapreduce_id": mapreduce_spec.mapreduce_id},
1700 headers=util._get_task_headers(mapreduce_spec))
1701 queue_name = util.get_queue_name(None)
1702 if not _run_task_hook(mapreduce_spec.get_hooks(),
1703 "enqueue_controller_task",
1704 finalize_task,
1705 queue_name):
1706 try:
1707 finalize_task.add(queue_name)
1708 except (taskqueue.TombstonedTaskError,
1709 taskqueue.TaskAlreadyExistsError), e:
1710 logging.warning("Task %r already exists. %s: %s",
1711 task_name, e.__class__, e)
1714 class CleanUpJobHandler(base_handler.PostJsonHandler):
1715 """Command to kick off tasks to clean up a job's data."""
1717 def handle(self):
1718 mapreduce_id = self.request.get("mapreduce_id")
1720 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1721 if mapreduce_state:
1722 shard_keys = model.ShardState.calculate_keys_by_mapreduce_state(
1723 mapreduce_state)
1724 db.delete(shard_keys)
1725 db.delete(mapreduce_state)
1726 self.json_response["status"] = ("Job %s successfully cleaned up." %
1727 mapreduce_id)
1730 class AbortJobHandler(base_handler.PostJsonHandler):
1731 """Command to abort a running job."""
1733 def handle(self):
1734 model.MapreduceControl.abort(self.request.get("mapreduce_id"))
1735 self.json_response["status"] = "Abort signal sent."