1.9.30 sync.
[gae.git] / python / google / appengine / ext / mapreduce / handlers.py
blobb27188d317a9c702807239ae3d7e8159cc790465
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
32 """Defines executor tasks handlers for MapReduce implementation."""
39 import datetime
40 import logging
41 import math
42 import os
43 import random
44 import sys
45 import time
46 import traceback
47 import simplejson
49 from google.appengine.ext import ndb
51 from google.appengine import runtime
52 from google.appengine.api import datastore_errors
53 from google.appengine.api import logservice
54 from google.appengine.api import modules
55 from google.appengine.api import taskqueue
56 from google.appengine.ext import db
57 from google.appengine.ext.mapreduce import base_handler
58 from google.appengine.ext.mapreduce import context
59 from google.appengine.ext.mapreduce import errors
60 from google.appengine.ext.mapreduce import input_readers
61 from google.appengine.ext.mapreduce import map_job_context
62 from google.appengine.ext.mapreduce import model
63 from google.appengine.ext.mapreduce import operation
64 from google.appengine.ext.mapreduce import output_writers
65 from google.appengine.ext.mapreduce import parameters
66 from google.appengine.ext.mapreduce import shard_life_cycle
67 from google.appengine.ext.mapreduce import util
68 from google.appengine.ext.mapreduce.api import map_job
69 from google.appengine.runtime import apiproxy_errors
72 try:
73 from google.appengine.ext import cloudstorage
78 if hasattr(cloudstorage, "_STUB"):
79 cloudstorage = None
80 except ImportError:
81 cloudstorage = None
93 _TEST_INJECTED_FAULTS = set()
96 def _run_task_hook(hooks, method, task, queue_name):
97 """Invokes hooks.method(task, queue_name).
99 Args:
100 hooks: A hooks.Hooks instance or None.
101 method: The name of the method to invoke on the hooks class e.g.
102 "enqueue_kickoff_task".
103 task: The taskqueue.Task to pass to the hook method.
104 queue_name: The name of the queue to pass to the hook method.
106 Returns:
107 True if the hooks.Hooks instance handled the method, False otherwise.
109 if hooks is not None:
110 try:
111 getattr(hooks, method)(task, queue_name)
112 except NotImplementedError:
114 return False
116 return True
117 return False
120 class MapperWorkerCallbackHandler(base_handler.HugeTaskHandler):
121 """Callback handler for mapreduce worker task."""
124 _TASK_DIRECTIVE = util._enum(
126 PROCEED_TASK="proceed_task",
129 RETRY_TASK="retry_task",
132 RETRY_SLICE="retry_slice",
134 DROP_TASK="drop_task",
136 RECOVER_SLICE="recover_slice",
138 RETRY_SHARD="retry_shard",
140 FAIL_TASK="fail_task",
142 ABORT_SHARD="abort_shard")
144 def __init__(self, *args):
145 """Constructor."""
146 super(MapperWorkerCallbackHandler, self).__init__(*args)
147 self._time = time.time
148 self.slice_context = None
149 self.shard_context = None
151 def _drop_gracefully(self):
152 """Drop worker task gracefully.
154 Set current shard_state to failed. Controller logic will take care of
155 other shards and the entire MR.
157 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
158 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
159 shard_state, mr_state = db.get([
160 model.ShardState.get_key_by_shard_id(shard_id),
161 model.MapreduceState.get_key_by_job_id(mr_id)])
163 if shard_state and shard_state.active:
164 shard_state.set_for_failure()
165 config = util.create_datastore_write_config(mr_state.mapreduce_spec)
166 shard_state.put(config=config)
168 def _try_acquire_lease(self, shard_state, tstate):
169 """Validate datastore and the task payload are consistent.
171 If so, attempt to get a lease on this slice's execution.
172 See model.ShardState doc on slice_start_time.
174 Args:
175 shard_state: model.ShardState from datastore.
176 tstate: model.TransientShardState from taskqueue paylod.
178 Returns:
179 A _TASK_DIRECTIVE enum. PROCEED_TASK if lock is acquired.
180 RETRY_TASK if task should be retried, DROP_TASK if task should
181 be dropped. Only old tasks (comparing to datastore state)
182 will be dropped. Future tasks are retried until they naturally
183 become old so that we don't ever stuck MR.
186 if not shard_state:
187 logging.warning("State not found for shard %s; Possible spurious task "
188 "execution. Dropping this task.",
189 tstate.shard_id)
190 return self._TASK_DIRECTIVE.DROP_TASK
192 if not shard_state.active:
193 logging.warning("Shard %s is not active. Possible spurious task "
194 "execution. Dropping this task.", tstate.shard_id)
195 logging.warning(str(shard_state))
196 return self._TASK_DIRECTIVE.DROP_TASK
199 if shard_state.retries > tstate.retries:
200 logging.warning(
201 "Got shard %s from previous shard retry %s. Possible spurious "
202 "task execution. Dropping this task.",
203 tstate.shard_id,
204 tstate.retries)
205 logging.warning(str(shard_state))
206 return self._TASK_DIRECTIVE.DROP_TASK
207 elif shard_state.retries < tstate.retries:
211 logging.warning(
212 "ShardState for %s is behind slice. Waiting for it to catch up",
213 shard_state.shard_id)
214 return self._TASK_DIRECTIVE.RETRY_TASK
218 if shard_state.slice_id > tstate.slice_id:
219 logging.warning(
220 "Task %s-%s is behind ShardState %s. Dropping task.""",
221 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
222 return self._TASK_DIRECTIVE.DROP_TASK
226 elif shard_state.slice_id < tstate.slice_id:
227 logging.warning(
228 "Task %s-%s is ahead of ShardState %s. Waiting for it to catch up.",
229 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
230 return self._TASK_DIRECTIVE.RETRY_TASK
234 if shard_state.slice_start_time:
235 countdown = self._wait_time(shard_state,
236 parameters._LEASE_DURATION_SEC)
237 if countdown > 0:
238 logging.warning(
239 "Last retry of slice %s-%s may be still running."
240 "Will try again in %s seconds", tstate.shard_id, tstate.slice_id,
241 countdown)
245 time.sleep(countdown)
246 return self._TASK_DIRECTIVE.RETRY_TASK
248 else:
249 if self._wait_time(shard_state,
250 parameters._MAX_LEASE_DURATION_SEC):
251 if not self._has_old_request_ended(shard_state):
252 logging.warning(
253 "Last retry of slice %s-%s is still in flight with request_id "
254 "%s. Will try again later.", tstate.shard_id, tstate.slice_id,
255 shard_state.slice_request_id)
256 return self._TASK_DIRECTIVE.RETRY_TASK
257 else:
258 logging.warning(
259 "Last retry of slice %s-%s has no log entry and has"
260 "timed out after %s seconds",
261 tstate.shard_id, tstate.slice_id,
262 parameters._MAX_LEASE_DURATION_SEC)
265 config = util.create_datastore_write_config(tstate.mapreduce_spec)
266 @db.transactional(retries=5)
267 def _tx():
268 """Use datastore to set slice_start_time to now.
270 If failed for any reason, raise error to retry the task (hence all
271 the previous validation code). The task would die naturally eventually.
273 Raises:
274 Rollback: If the shard state is missing.
276 Returns:
277 A _TASK_DIRECTIVE enum.
279 fresh_state = model.ShardState.get_by_shard_id(tstate.shard_id)
280 if not fresh_state:
281 logging.warning("ShardState missing.")
282 raise db.Rollback()
283 if (fresh_state.active and
284 fresh_state.slice_id == shard_state.slice_id and
285 fresh_state.slice_start_time == shard_state.slice_start_time):
286 shard_state.slice_start_time = datetime.datetime.now()
287 shard_state.slice_request_id = os.environ.get("REQUEST_LOG_ID")
288 shard_state.acquired_once = True
289 shard_state.put(config=config)
290 return self._TASK_DIRECTIVE.PROCEED_TASK
291 else:
292 logging.warning(
293 "Contention on slice %s-%s execution. Will retry again.",
294 tstate.shard_id, tstate.slice_id)
296 time.sleep(random.randrange(1, 5))
297 return self._TASK_DIRECTIVE.RETRY_TASK
299 return _tx()
301 def _has_old_request_ended(self, shard_state):
302 """Whether previous slice retry has ended according to Logs API.
304 Args:
305 shard_state: shard state.
307 Returns:
308 True if the request of previous slice retry has ended. False if it has
309 not or unknown.
311 assert shard_state.slice_start_time is not None
312 assert shard_state.slice_request_id is not None
313 request_ids = [shard_state.slice_request_id]
314 logs = list(logservice.fetch(
315 request_ids=request_ids,
317 module_versions=[(os.environ["CURRENT_MODULE_ID"],
318 modules.get_current_version_name())]))
320 if not logs or not logs[0].finished:
321 return False
322 return True
324 def _wait_time(self, shard_state, secs, now=datetime.datetime.now):
325 """Time to wait until slice_start_time is secs ago from now.
327 Args:
328 shard_state: shard state.
329 secs: duration in seconds.
330 now: a func that gets now.
332 Returns:
333 0 if no wait. A positive int in seconds otherwise. Always around up.
335 assert shard_state.slice_start_time is not None
336 delta = now() - shard_state.slice_start_time
337 duration = datetime.timedelta(seconds=secs)
338 if delta < duration:
339 return util.total_seconds(duration - delta)
340 else:
341 return 0
343 def _try_free_lease(self, shard_state, slice_retry=False):
344 """Try to free lease.
346 A lightweight transaction to update shard_state and unset
347 slice_start_time to allow the next retry to happen without blocking.
348 We don't care if this fails or not because the lease will expire
349 anyway.
351 Under normal execution, _save_state_and_schedule_next is the exit point.
352 It updates/saves shard state and schedules the next slice or returns.
353 Other exit points are:
354 1. _are_states_consistent: at the beginning of handle, checks
355 if datastore states and the task are in sync.
356 If not, raise or return.
357 2. _attempt_slice_retry: may raise exception to taskqueue.
358 3. _save_state_and_schedule_next: may raise exception when taskqueue/db
359 unreachable.
361 This handler should try to free the lease on every exceptional exit point.
363 Args:
364 shard_state: model.ShardState.
365 slice_retry: whether to count this as a failed slice execution.
367 @db.transactional
368 def _tx():
369 fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
370 if fresh_state and fresh_state.active:
372 fresh_state.slice_start_time = None
373 fresh_state.slice_request_id = None
374 if slice_retry:
375 fresh_state.slice_retries += 1
376 fresh_state.put()
377 try:
378 _tx()
380 except Exception, e:
381 logging.warning(e)
382 logging.warning(
383 "Release lock for shard %s failed. Wait for lease to expire.",
384 shard_state.shard_id)
386 def _maintain_LC(self, obj, slice_id, last_slice=False, begin_slice=True,
387 shard_ctx=None, slice_ctx=None):
388 """Makes sure shard life cycle interface are respected.
390 Args:
391 obj: the obj that may have implemented _ShardLifeCycle.
392 slice_id: current slice_id
393 last_slice: whether this is the last slice.
394 begin_slice: whether this is the beginning or the end of a slice.
395 shard_ctx: shard ctx for dependency injection. If None, it will be read
396 from self.
397 slice_ctx: slice ctx for dependency injection. If None, it will be read
398 from self.
400 if obj is None or not isinstance(obj, shard_life_cycle._ShardLifeCycle):
401 return
403 shard_context = shard_ctx or self.shard_context
404 slice_context = slice_ctx or self.slice_context
405 if begin_slice:
406 if slice_id == 0:
407 obj.begin_shard(shard_context)
408 obj.begin_slice(slice_context)
409 else:
410 obj.end_slice(slice_context)
411 if last_slice:
412 obj.end_shard(shard_context)
414 def _lc_start_slice(self, tstate, slice_id):
415 self._maintain_LC(tstate.output_writer, slice_id)
416 self._maintain_LC(tstate.input_reader, slice_id)
417 self._maintain_LC(tstate.handler, slice_id)
419 def _lc_end_slice(self, tstate, slice_id, last_slice=False):
420 self._maintain_LC(tstate.handler, slice_id, last_slice=last_slice,
421 begin_slice=False)
422 self._maintain_LC(tstate.input_reader, slice_id, last_slice=last_slice,
423 begin_slice=False)
424 self._maintain_LC(tstate.output_writer, slice_id, last_slice=last_slice,
425 begin_slice=False)
427 def handle(self):
428 """Handle request.
430 This method has to be careful to pass the same ShardState instance to
431 its subroutines calls if the calls mutate or read from ShardState.
432 Note especially that Context instance caches and updates the ShardState
433 instance.
435 Returns:
436 Set HTTP status code and always returns None.
439 self._start_time = self._time()
440 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
441 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
442 spec = model.MapreduceSpec._get_mapreduce_spec(mr_id)
443 shard_state, control = db.get([
444 model.ShardState.get_key_by_shard_id(shard_id),
445 model.MapreduceControl.get_key_by_job_id(mr_id),
449 ctx = context.Context(spec, shard_state,
450 task_retry_count=self.task_retry_count())
451 context.Context._set(ctx)
454 tstate = model.TransientShardState.from_request(self.request)
457 if shard_state:
458 is_this_a_retry = shard_state.acquired_once
459 task_directive = self._try_acquire_lease(shard_state, tstate)
460 if task_directive in (self._TASK_DIRECTIVE.RETRY_TASK,
461 self._TASK_DIRECTIVE.DROP_TASK):
462 return self.__return(shard_state, tstate, task_directive)
463 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
466 if control and control.command == model.MapreduceControl.ABORT:
467 task_directive = self._TASK_DIRECTIVE.ABORT_SHARD
468 return self.__return(shard_state, tstate, task_directive)
471 if (is_this_a_retry and
472 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS <= 1):
473 task_directive = self._TASK_DIRECTIVE.RETRY_SHARD
474 return self.__return(shard_state, tstate, task_directive)
478 util._set_ndb_cache_policy()
480 job_config = map_job.JobConfig._to_map_job_config(
481 spec,
482 os.environ.get("HTTP_X_APPENGINE_QUEUENAME"))
483 job_context = map_job_context.JobContext(job_config)
484 self.shard_context = map_job_context.ShardContext(job_context, shard_state)
485 self.slice_context = map_job_context.SliceContext(self.shard_context,
486 shard_state,
487 tstate)
488 try:
489 slice_id = tstate.slice_id
490 self._lc_start_slice(tstate, slice_id)
492 if shard_state.is_input_finished():
493 self._lc_end_slice(tstate, slice_id, last_slice=True)
495 if (tstate.output_writer and
496 isinstance(tstate.output_writer, output_writers.OutputWriter)):
501 tstate.output_writer.finalize(ctx, shard_state)
502 shard_state.set_for_success()
503 return self.__return(shard_state, tstate, task_directive)
505 if is_this_a_retry:
506 task_directive = self._attempt_slice_recovery(shard_state, tstate)
507 if task_directive != self._TASK_DIRECTIVE.PROCEED_TASK:
508 return self.__return(shard_state, tstate, task_directive)
510 last_slice = self._process_inputs(
511 tstate.input_reader, shard_state, tstate, ctx)
513 self._lc_end_slice(tstate, slice_id)
515 ctx.flush()
517 if last_slice:
521 shard_state.set_input_finished()
523 except Exception, e:
524 logging.warning("Shard %s got error.", shard_state.shard_id)
525 logging.error(traceback.format_exc())
528 if type(e) is errors.FailJobError:
529 logging.error("Got FailJobError.")
530 task_directive = self._TASK_DIRECTIVE.FAIL_TASK
531 else:
532 task_directive = self._TASK_DIRECTIVE.RETRY_SLICE
534 self.__return(shard_state, tstate, task_directive)
536 def __return(self, shard_state, tstate, task_directive):
537 """Handler should always call this as the last statement."""
538 task_directive = self._set_state(shard_state, tstate, task_directive)
539 self._save_state_and_schedule_next(shard_state, tstate, task_directive)
541 def _process_inputs(self,
542 input_reader,
543 shard_state,
544 tstate,
545 ctx):
546 """Read inputs, process them, and write out outputs.
548 This is the core logic of MapReduce. It reads inputs from input reader,
549 invokes user specified mapper function, and writes output with
550 output writer. It also updates shard_state accordingly.
551 e.g. if shard processing is done, set shard_state.active to False.
553 If errors.FailJobError is caught, it will fail this MR job.
554 All other exceptions will be logged and raised to taskqueue for retry
555 until the number of retries exceeds a limit.
557 Args:
558 input_reader: input reader.
559 shard_state: shard state.
560 tstate: transient shard state.
561 ctx: mapreduce context.
563 Returns:
564 Whether this shard has finished processing all its input split.
566 processing_limit = self._processing_limit(tstate.mapreduce_spec)
567 if processing_limit == 0:
568 return
570 finished_shard = True
572 iterator = iter(input_reader)
574 while True:
575 try:
576 entity = iterator.next()
577 except StopIteration:
578 break
590 if isinstance(entity, db.Model):
591 shard_state.last_work_item = repr(entity.key())
592 elif isinstance(entity, ndb.Model):
593 shard_state.last_work_item = repr(entity.key)
594 else:
595 shard_state.last_work_item = repr(entity)[:100]
597 processing_limit -= 1
599 if not self._process_datum(
600 entity, input_reader, ctx, tstate):
601 finished_shard = False
602 break
603 elif processing_limit == 0:
604 finished_shard = False
605 break
608 self.slice_context.incr(
609 context.COUNTER_MAPPER_WALLTIME_MS,
610 int((self._time() - self._start_time)*1000))
612 return finished_shard
614 def _process_datum(self, data, input_reader, ctx, transient_shard_state):
615 """Process a single data piece.
617 Call mapper handler on the data.
619 Args:
620 data: a datum to process.
621 input_reader: input reader.
622 ctx: mapreduce context
623 transient_shard_state: transient shard state.
625 Returns:
626 True if scan should be continued, False if scan should be stopped.
628 if data is not input_readers.ALLOW_CHECKPOINT:
629 self.slice_context.incr(context.COUNTER_MAPPER_CALLS)
631 handler = transient_shard_state.handler
633 if isinstance(handler, map_job.Mapper):
634 handler(self.slice_context, data)
635 else:
636 if input_reader.expand_parameters:
637 result = handler(*data)
638 else:
639 result = handler(data)
641 if util.is_generator(result):
642 for output in result:
643 if isinstance(output, operation.Operation):
644 output(ctx)
645 else:
646 output_writer = transient_shard_state.output_writer
647 if not output_writer:
648 logging.warning(
649 "Handler yielded %s, but no output writer is set.", output)
650 else:
651 output_writer.write(output)
653 if self._time() - self._start_time >= parameters.config._SLICE_DURATION_SEC:
654 return False
655 return True
657 def _set_state(self, shard_state, tstate, task_directive):
658 """Set shard_state and tstate based on task_directive.
660 Args:
661 shard_state: model.ShardState for current shard.
662 tstate: model.TransientShardState for current shard.
663 task_directive: self._TASK_DIRECTIVE for current shard.
665 Returns:
666 A _TASK_DIRECTIVE enum.
667 PROCEED_TASK if task should proceed normally.
668 RETRY_SHARD if shard should be retried.
669 RETRY_SLICE if slice should be retried.
670 FAIL_TASK if sahrd should fail.
671 RECOVER_SLICE if slice should be recovered.
672 ABORT_SHARD if shard should be aborted.
673 RETRY_TASK if task should be retried.
674 DROP_TASK if task should be dropped.
676 if task_directive in (self._TASK_DIRECTIVE.RETRY_TASK,
677 self._TASK_DIRECTIVE.DROP_TASK):
678 return task_directive
680 if task_directive == self._TASK_DIRECTIVE.ABORT_SHARD:
681 shard_state.set_for_abort()
682 return task_directive
684 if task_directive == self._TASK_DIRECTIVE.PROCEED_TASK:
685 shard_state.advance_for_next_slice()
686 tstate.advance_for_next_slice()
687 return task_directive
689 if task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
690 tstate.advance_for_next_slice(recovery_slice=True)
691 shard_state.advance_for_next_slice(recovery_slice=True)
692 return task_directive
694 if task_directive == self._TASK_DIRECTIVE.RETRY_SLICE:
695 task_directive = self._attempt_slice_retry(shard_state, tstate)
696 if task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
697 task_directive = self._attempt_shard_retry(shard_state, tstate)
698 if task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
699 shard_state.set_for_failure()
701 return task_directive
703 def _save_state_and_schedule_next(self, shard_state, tstate, task_directive):
704 """Save state and schedule task.
706 Save shard state to datastore.
707 Schedule next slice if needed.
708 Set HTTP response code.
709 No modification to any shard_state or tstate.
711 Args:
712 shard_state: model.ShardState for current shard.
713 tstate: model.TransientShardState for current shard.
714 task_directive: enum _TASK_DIRECTIVE.
716 Returns:
717 The task to retry if applicable.
719 spec = tstate.mapreduce_spec
721 if task_directive == self._TASK_DIRECTIVE.DROP_TASK:
722 return
723 if task_directive in (self._TASK_DIRECTIVE.RETRY_SLICE,
724 self._TASK_DIRECTIVE.RETRY_TASK):
726 return self.retry_task()
727 elif task_directive == self._TASK_DIRECTIVE.ABORT_SHARD:
728 logging.info("Aborting shard %d of job '%s'",
729 shard_state.shard_number, shard_state.mapreduce_id)
730 task = None
731 elif task_directive == self._TASK_DIRECTIVE.FAIL_TASK:
732 logging.critical("Shard %s failed permanently.", shard_state.shard_id)
733 task = None
734 elif task_directive == self._TASK_DIRECTIVE.RETRY_SHARD:
735 logging.warning("Shard %s is going to be attempted for the %s time.",
736 shard_state.shard_id,
737 shard_state.retries + 1)
738 task = self._state_to_task(tstate, shard_state)
739 elif task_directive == self._TASK_DIRECTIVE.RECOVER_SLICE:
740 logging.warning("Shard %s slice %s is being recovered.",
741 shard_state.shard_id,
742 shard_state.slice_id)
743 task = self._state_to_task(tstate, shard_state)
744 else:
745 assert task_directive == self._TASK_DIRECTIVE.PROCEED_TASK
746 countdown = self._get_countdown_for_next_slice(spec)
747 task = self._state_to_task(tstate, shard_state, countdown=countdown)
750 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
753 "default")
754 config = util.create_datastore_write_config(spec)
756 @db.transactional(retries=5)
757 def _tx():
758 """The Transaction helper."""
759 fresh_shard_state = model.ShardState.get_by_shard_id(tstate.shard_id)
760 if not fresh_shard_state:
761 raise db.Rollback()
762 if (not fresh_shard_state.active or
763 "worker_active_state_collision" in _TEST_INJECTED_FAULTS):
764 logging.warning("Shard %s is not active. Possible spurious task "
765 "execution. Dropping this task.", tstate.shard_id)
766 logging.warning("Datastore's %s", str(fresh_shard_state))
767 logging.warning("Slice's %s", str(shard_state))
768 return
769 fresh_shard_state.copy_from(shard_state)
770 fresh_shard_state.put(config=config)
775 if fresh_shard_state.active:
778 self._add_task(task, spec, queue_name)
780 try:
781 _tx()
782 except (datastore_errors.Error,
783 taskqueue.Error,
784 runtime.DeadlineExceededError,
785 apiproxy_errors.Error), e:
786 logging.warning(
787 "Can't transactionally continue shard. "
788 "Will retry slice %s %s for the %s time.",
789 tstate.shard_id,
790 tstate.slice_id,
791 self.task_retry_count() + 1)
792 self._try_free_lease(shard_state)
793 raise e
795 def _attempt_slice_recovery(self, shard_state, tstate):
796 """Recover a slice.
798 This is run when a slice had been previously attempted and output
799 may have been written. If an output writer requires slice recovery,
800 we run those logic to remove output duplicates. Otherwise we just retry
801 the slice.
803 If recovery is needed, then the entire slice will be dedicated
804 to recovery logic. No data processing will take place. Thus we call
805 the slice "recovery slice". This is needed for correctness:
806 An output writer instance can be out of sync from its physical
807 medium only when the slice dies after acquring the shard lock but before
808 committing shard state to db. The worst failure case is when
809 shard state failed to commit after the NAMED task for the next slice was
810 added. Thus, recovery slice has a special logic to increment current
811 slice_id n to n+2. If the task for n+1 had been added, it will be dropped
812 because it is behind shard state.
814 Args:
815 shard_state: an instance of Model.ShardState.
816 tstate: an instance of Model.TransientShardState.
818 Returns:
819 _TASK_DIRECTIVE.PROCEED_TASK to continue with this retry.
820 _TASK_DIRECTIVE.RECOVER_SLICE to recover this slice.
821 The next slice will start at the same input as
822 this slice but output to a new instance of output writer.
823 Combining outputs from all writer instances is up to implementation.
825 mapper_spec = tstate.mapreduce_spec.mapper
826 if not (tstate.output_writer and
827 tstate.output_writer._supports_slice_recovery(mapper_spec)):
828 return self._TASK_DIRECTIVE.PROCEED_TASK
830 tstate.output_writer = tstate.output_writer._recover(
831 tstate.mapreduce_spec, shard_state.shard_number,
832 shard_state.retries + 1)
833 return self._TASK_DIRECTIVE.RECOVER_SLICE
835 def _attempt_shard_retry(self, shard_state, tstate):
836 """Whether to retry shard.
838 This method may modify shard_state and tstate to prepare for retry or fail.
840 Args:
841 shard_state: model.ShardState for current shard.
842 tstate: model.TransientShardState for current shard.
844 Returns:
845 A _TASK_DIRECTIVE enum. RETRY_SHARD if shard should be retried.
846 FAIL_TASK otherwise.
848 shard_attempts = shard_state.retries + 1
850 if shard_attempts >= parameters.config.SHARD_MAX_ATTEMPTS:
851 logging.warning(
852 "Shard attempt %s exceeded %s max attempts.",
853 shard_attempts, parameters.config.SHARD_MAX_ATTEMPTS)
854 return self._TASK_DIRECTIVE.FAIL_TASK
855 if tstate.output_writer and (
856 not tstate.output_writer._supports_shard_retry(tstate)):
857 logging.warning("Output writer %s does not support shard retry.",
858 tstate.output_writer.__class__.__name__)
859 return self._TASK_DIRECTIVE.FAIL_TASK
861 shard_state.reset_for_retry()
862 logging.warning("Shard %s attempt %s failed with up to %s attempts.",
863 shard_state.shard_id,
864 shard_state.retries,
865 parameters.config.SHARD_MAX_ATTEMPTS)
866 output_writer = None
867 if tstate.output_writer:
868 output_writer = tstate.output_writer.create(
869 tstate.mapreduce_spec, shard_state.shard_number, shard_attempts + 1)
870 tstate.reset_for_retry(output_writer)
871 return self._TASK_DIRECTIVE.RETRY_SHARD
873 def _attempt_slice_retry(self, shard_state, tstate):
874 """Attempt to retry this slice.
876 This method may modify shard_state and tstate to prepare for retry or fail.
878 Args:
879 shard_state: model.ShardState for current shard.
880 tstate: model.TransientShardState for current shard.
882 Returns:
883 A _TASK_DIRECTIVE enum. RETRY_SLICE if slice should be retried.
884 RETRY_SHARD if shard retry should be attempted.
886 if (shard_state.slice_retries + 1 <
887 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS):
888 logging.warning(
889 "Slice %s %s failed for the %s of up to %s attempts "
890 "(%s of %s taskqueue execution attempts). "
891 "Will retry now.",
892 tstate.shard_id,
893 tstate.slice_id,
894 shard_state.slice_retries + 1,
895 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS,
896 self.task_retry_count() + 1,
897 parameters.config.TASK_MAX_ATTEMPTS)
901 sys.exc_clear()
902 self._try_free_lease(shard_state, slice_retry=True)
903 return self._TASK_DIRECTIVE.RETRY_SLICE
905 if parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS > 0:
906 logging.warning("Slice attempt %s exceeded %s max attempts.",
907 self.task_retry_count() + 1,
908 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)
909 return self._TASK_DIRECTIVE.RETRY_SHARD
911 @staticmethod
912 def get_task_name(shard_id, slice_id, retry=0):
913 """Compute single worker task name.
915 Args:
916 shard_id: shard id.
917 slice_id: slice id.
918 retry: current shard retry count.
920 Returns:
921 task name which should be used to process specified shard/slice.
925 return "appengine-mrshard-%s-%s-retry-%s" % (
926 shard_id, slice_id, retry)
928 def _get_countdown_for_next_slice(self, spec):
929 """Get countdown for next slice's task.
931 When user sets processing rate, we set countdown to delay task execution.
933 Args:
934 spec: model.MapreduceSpec
936 Returns:
937 countdown in int.
939 countdown = 0
940 if self._processing_limit(spec) != -1:
941 countdown = max(
942 int(parameters.config._SLICE_DURATION_SEC -
943 (self._time() - self._start_time)), 0)
944 return countdown
946 @classmethod
947 def _state_to_task(cls,
948 tstate,
949 shard_state,
950 eta=None,
951 countdown=None):
952 """Generate task for slice according to current states.
954 Args:
955 tstate: An instance of TransientShardState.
956 shard_state: An instance of ShardState.
957 eta: Absolute time when the MR should execute. May not be specified
958 if 'countdown' is also supplied. This may be timezone-aware or
959 timezone-naive.
960 countdown: Time in seconds into the future that this MR should execute.
961 Defaults to zero.
963 Returns:
964 A model.HugeTask instance for the slice specified by current states.
966 base_path = tstate.base_path
968 task_name = MapperWorkerCallbackHandler.get_task_name(
969 tstate.shard_id,
970 tstate.slice_id,
971 tstate.retries)
973 headers = util._get_task_headers(tstate.mapreduce_spec.mapreduce_id)
974 headers[util._MR_SHARD_ID_TASK_HEADER] = tstate.shard_id
976 worker_task = model.HugeTask(
977 url=base_path + "/worker_callback/" + tstate.shard_id,
978 params=tstate.to_dict(),
979 name=task_name,
980 eta=eta,
981 countdown=countdown,
982 parent=shard_state,
983 headers=headers)
984 return worker_task
986 @classmethod
987 def _add_task(cls,
988 worker_task,
989 mapreduce_spec,
990 queue_name):
991 """Schedule slice scanning by adding it to the task queue.
993 Args:
994 worker_task: a model.HugeTask task for slice. This is NOT a taskqueue
995 task.
996 mapreduce_spec: an instance of model.MapreduceSpec.
997 queue_name: Optional queue to run on; uses the current queue of
998 execution or the default queue if unspecified.
1000 if not _run_task_hook(mapreduce_spec.get_hooks(),
1001 "enqueue_worker_task",
1002 worker_task,
1003 queue_name):
1004 try:
1007 worker_task.add(queue_name)
1008 except (taskqueue.TombstonedTaskError,
1009 taskqueue.TaskAlreadyExistsError), e:
1010 logging.warning("Task %r already exists. %s: %s",
1011 worker_task.name,
1012 e.__class__,
1015 def _processing_limit(self, spec):
1016 """Get the limit on the number of map calls allowed by this slice.
1018 Args:
1019 spec: a Mapreduce spec.
1021 Returns:
1022 The limit as a positive int if specified by user. -1 otherwise.
1024 processing_rate = float(spec.mapper.params.get("processing_rate", 0))
1025 slice_processing_limit = -1
1026 if processing_rate > 0:
1027 slice_processing_limit = int(math.ceil(
1028 parameters.config._SLICE_DURATION_SEC*processing_rate/
1029 int(spec.mapper.shard_count)))
1030 return slice_processing_limit
1034 @classmethod
1035 def _schedule_slice(cls,
1036 shard_state,
1037 tstate,
1038 queue_name=None,
1039 eta=None,
1040 countdown=None):
1041 """Schedule slice scanning by adding it to the task queue.
1043 Args:
1044 shard_state: An instance of ShardState.
1045 tstate: An instance of TransientShardState.
1046 queue_name: Optional queue to run on; uses the current queue of
1047 execution or the default queue if unspecified.
1048 eta: Absolute time when the MR should execute. May not be specified
1049 if 'countdown' is also supplied. This may be timezone-aware or
1050 timezone-naive.
1051 countdown: Time in seconds into the future that this MR should execute.
1052 Defaults to zero.
1054 queue_name = queue_name or os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
1055 "default")
1056 task = cls._state_to_task(tstate, shard_state, eta, countdown)
1057 cls._add_task(task, tstate.mapreduce_spec, queue_name)
1060 class ControllerCallbackHandler(base_handler.HugeTaskHandler):
1061 """Supervises mapreduce execution.
1063 Is also responsible for gathering execution status from shards together.
1065 This task is "continuously" running by adding itself again to taskqueue if
1066 and only if mapreduce is still active. A mapreduce is active if it has
1067 actively running shards.
1070 def __init__(self, *args):
1071 """Constructor."""
1072 super(ControllerCallbackHandler, self).__init__(*args)
1073 self._time = time.time
1075 def _drop_gracefully(self):
1076 """Gracefully drop controller task.
1078 This method is called when decoding controller task payload failed.
1079 Upon this we mark ShardState and MapreduceState as failed so all
1080 tasks can stop.
1082 Writing to datastore is forced (ignore read-only mode) because we
1083 want the tasks to stop badly, and if force_writes was False,
1084 the job would have never been started.
1086 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
1087 state = model.MapreduceState.get_by_job_id(mr_id)
1088 if not state or not state.active:
1089 return
1091 state.active = False
1092 state.result_status = model.MapreduceState.RESULT_FAILED
1093 config = util.create_datastore_write_config(state.mapreduce_spec)
1094 puts = []
1095 for ss in model.ShardState.find_all_by_mapreduce_state(state):
1096 if ss.active:
1097 ss.set_for_failure()
1098 puts.append(ss)
1100 if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
1101 db.put(puts, config=config)
1102 puts = []
1103 db.put(puts, config=config)
1105 db.put(state, config=config)
1107 def handle(self):
1108 """Handle request."""
1109 spec = model.MapreduceSpec.from_json_str(
1110 self.request.get("mapreduce_spec"))
1111 state, control = db.get([
1112 model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
1113 model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
1116 if not state:
1117 logging.warning("State not found for MR '%s'; dropping controller task.",
1118 spec.mapreduce_id)
1119 return
1120 if not state.active:
1121 logging.warning(
1122 "MR %r is not active. Looks like spurious controller task execution.",
1123 spec.mapreduce_id)
1124 self._clean_up_mr(spec)
1125 return
1127 shard_states = model.ShardState.find_all_by_mapreduce_state(state)
1128 self._update_state_from_shard_states(state, shard_states, control)
1130 if state.active:
1131 ControllerCallbackHandler.reschedule(
1132 state, spec, self.serial_id() + 1)
1134 def _update_state_from_shard_states(self, state, shard_states, control):
1135 """Update mr state by examing shard states.
1137 Args:
1138 state: current mapreduce state as MapreduceState.
1139 shard_states: an iterator over shard states.
1140 control: model.MapreduceControl entity.
1143 state.active_shards, state.aborted_shards, state.failed_shards = 0, 0, 0
1144 total_shards = 0
1145 processed_counts = []
1146 state.counters_map.clear()
1149 for s in shard_states:
1150 total_shards += 1
1151 if s.active:
1152 state.active_shards += 1
1153 if s.result_status == model.ShardState.RESULT_ABORTED:
1154 state.aborted_shards += 1
1155 elif s.result_status == model.ShardState.RESULT_FAILED:
1156 state.failed_shards += 1
1159 state.counters_map.add_map(s.counters_map)
1160 processed_counts.append(s.counters_map.get(context.COUNTER_MAPPER_CALLS))
1162 state.set_processed_counts(processed_counts)
1163 state.last_poll_time = datetime.datetime.utcfromtimestamp(self._time())
1165 spec = state.mapreduce_spec
1167 if total_shards != spec.mapper.shard_count:
1168 logging.error("Found %d shard states. Expect %d. "
1169 "Issuing abort command to job '%s'",
1170 total_shards, spec.mapper.shard_count,
1171 spec.mapreduce_id)
1173 model.MapreduceControl.abort(spec.mapreduce_id)
1177 state.active = bool(state.active_shards)
1178 if not control and (state.failed_shards or state.aborted_shards):
1180 model.MapreduceControl.abort(spec.mapreduce_id)
1182 if not state.active:
1184 if state.failed_shards or not total_shards:
1185 state.result_status = model.MapreduceState.RESULT_FAILED
1188 elif state.aborted_shards:
1189 state.result_status = model.MapreduceState.RESULT_ABORTED
1190 else:
1191 state.result_status = model.MapreduceState.RESULT_SUCCESS
1192 self._finalize_outputs(spec, state)
1193 self._finalize_job(spec, state)
1194 else:
1195 @db.transactional(retries=5)
1196 def _put_state():
1197 """The helper for storing the state."""
1198 fresh_state = model.MapreduceState.get_by_job_id(spec.mapreduce_id)
1201 if not fresh_state.active:
1202 logging.warning(
1203 "Job %s is not active. Looks like spurious task execution. "
1204 "Dropping controller task.", spec.mapreduce_id)
1205 return
1206 config = util.create_datastore_write_config(spec)
1207 state.put(config=config)
1209 _put_state()
1211 def serial_id(self):
1212 """Get serial unique identifier of this task from request.
1214 Returns:
1215 serial identifier as int.
1217 return int(self.request.get("serial_id"))
1219 @classmethod
1220 def _finalize_outputs(cls, mapreduce_spec, mapreduce_state):
1221 """Finalize outputs.
1223 Args:
1224 mapreduce_spec: an instance of MapreduceSpec.
1225 mapreduce_state: an instance of MapreduceState.
1228 if (mapreduce_spec.mapper.output_writer_class() and
1229 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
1230 mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
1232 @classmethod
1233 def _finalize_job(cls, mapreduce_spec, mapreduce_state):
1234 """Finalize job execution.
1236 Invokes done callback and save mapreduce state in a transaction,
1237 and schedule necessary clean ups. This method is idempotent.
1239 Args:
1240 mapreduce_spec: an instance of MapreduceSpec
1241 mapreduce_state: an instance of MapreduceState
1243 config = util.create_datastore_write_config(mapreduce_spec)
1244 queue_name = util.get_queue_name(mapreduce_spec.params.get(
1245 model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
1246 done_callback = mapreduce_spec.params.get(
1247 model.MapreduceSpec.PARAM_DONE_CALLBACK)
1248 done_task = None
1249 if done_callback:
1250 done_task = taskqueue.Task(
1251 url=done_callback,
1252 headers=util._get_task_headers(mapreduce_spec.mapreduce_id,
1253 util.CALLBACK_MR_ID_TASK_HEADER),
1254 method=mapreduce_spec.params.get("done_callback_method", "POST"))
1256 @db.transactional(retries=5)
1257 def _put_state():
1258 """Helper to store state."""
1259 fresh_state = model.MapreduceState.get_by_job_id(
1260 mapreduce_spec.mapreduce_id)
1261 if not fresh_state.active:
1262 logging.warning(
1263 "Job %s is not active. Looks like spurious task execution. "
1264 "Dropping task.", mapreduce_spec.mapreduce_id)
1265 return
1266 mapreduce_state.put(config=config)
1268 if done_task and not _run_task_hook(
1269 mapreduce_spec.get_hooks(),
1270 "enqueue_done_task",
1271 done_task,
1272 queue_name):
1273 done_task.add(queue_name, transactional=True)
1275 _put_state()
1276 logging.info("Final result for job '%s' is '%s'",
1277 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
1278 cls._clean_up_mr(mapreduce_spec)
1280 @classmethod
1281 def _clean_up_mr(cls, mapreduce_spec):
1282 FinalizeJobHandler.schedule(mapreduce_spec)
1284 @staticmethod
1285 def get_task_name(mapreduce_spec, serial_id):
1286 """Compute single controller task name.
1288 Args:
1289 mapreduce_spec: specification of the mapreduce.
1290 serial_id: id of the invocation as int.
1292 Returns:
1293 task name which should be used to process specified shard/slice.
1297 return "appengine-mrcontrol-%s-%s" % (
1298 mapreduce_spec.mapreduce_id, serial_id)
1300 @staticmethod
1301 def controller_parameters(mapreduce_spec, serial_id):
1302 """Fill in controller task parameters.
1304 Returned parameters map is to be used as task payload, and it contains
1305 all the data, required by controller to perform its function.
1307 Args:
1308 mapreduce_spec: specification of the mapreduce.
1309 serial_id: id of the invocation as int.
1311 Returns:
1312 string->string map of parameters to be used as task payload.
1314 return {"mapreduce_spec": mapreduce_spec.to_json_str(),
1315 "serial_id": str(serial_id)}
1317 @classmethod
1318 def reschedule(cls,
1319 mapreduce_state,
1320 mapreduce_spec,
1321 serial_id,
1322 queue_name=None):
1323 """Schedule new update status callback task.
1325 Args:
1326 mapreduce_state: mapreduce state as model.MapreduceState
1327 mapreduce_spec: mapreduce specification as MapreduceSpec.
1328 serial_id: id of the invocation as int.
1329 queue_name: The queue to schedule this task on. Will use the current
1330 queue of execution if not supplied.
1332 task_name = ControllerCallbackHandler.get_task_name(
1333 mapreduce_spec, serial_id)
1334 task_params = ControllerCallbackHandler.controller_parameters(
1335 mapreduce_spec, serial_id)
1336 if not queue_name:
1337 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
1339 controller_callback_task = model.HugeTask(
1340 url=(mapreduce_spec.params["base_path"] + "/controller_callback/" +
1341 mapreduce_spec.mapreduce_id),
1342 name=task_name, params=task_params,
1343 countdown=parameters.config._CONTROLLER_PERIOD_SEC,
1344 parent=mapreduce_state,
1345 headers=util._get_task_headers(mapreduce_spec.mapreduce_id))
1347 if not _run_task_hook(mapreduce_spec.get_hooks(),
1348 "enqueue_controller_task",
1349 controller_callback_task,
1350 queue_name):
1351 try:
1352 controller_callback_task.add(queue_name)
1353 except (taskqueue.TombstonedTaskError,
1354 taskqueue.TaskAlreadyExistsError), e:
1355 logging.warning("Task %r with params %r already exists. %s: %s",
1356 task_name, task_params, e.__class__, e)
1359 class KickOffJobHandler(base_handler.TaskQueueHandler):
1360 """Taskqueue handler which kicks off a mapreduce processing.
1362 This handler is idempotent.
1364 Precondition:
1365 The Model.MapreduceState entity for this mr is already created and
1366 saved to datastore by StartJobHandler._start_map.
1368 Request Parameters:
1369 mapreduce_id: in string.
1373 _SERIALIZED_INPUT_READERS_KEY = "input_readers_for_mr_%s"
1375 def handle(self):
1376 """Handles kick off request."""
1378 mr_id = self.request.get("mapreduce_id")
1380 logging.info("Processing kickoff for job %s", mr_id)
1381 state = model.MapreduceState.get_by_job_id(mr_id)
1382 if not self._check_mr_state(state, mr_id):
1383 return
1386 readers, serialized_readers_entity = self._get_input_readers(state)
1387 if readers is None:
1389 logging.warning("Found no mapper input data to process.")
1390 state.active = False
1391 state.result_status = model.MapreduceState.RESULT_SUCCESS
1392 ControllerCallbackHandler._finalize_job(
1393 state.mapreduce_spec, state)
1394 return False
1397 self._setup_output_writer(state)
1401 result = self._save_states(state, serialized_readers_entity)
1402 if result is None:
1403 readers, _ = self._get_input_readers(state)
1404 elif not result:
1405 return
1407 queue_name = self.request.headers.get("X-AppEngine-QueueName")
1408 KickOffJobHandler._schedule_shards(state.mapreduce_spec, readers,
1409 queue_name,
1410 state.mapreduce_spec.params["base_path"],
1411 state)
1413 ControllerCallbackHandler.reschedule(
1414 state, state.mapreduce_spec, serial_id=0, queue_name=queue_name)
1416 def _drop_gracefully(self):
1417 """See parent."""
1418 mr_id = self.request.get("mapreduce_id")
1419 logging.error("Failed to kick off job %s", mr_id)
1421 state = model.MapreduceState.get_by_job_id(mr_id)
1422 if not self._check_mr_state(state, mr_id):
1423 return
1426 config = util.create_datastore_write_config(state.mapreduce_spec)
1427 model.MapreduceControl.abort(mr_id, config=config)
1430 state.active = False
1431 state.result_status = model.MapreduceState.RESULT_FAILED
1432 ControllerCallbackHandler._finalize_job(state.mapreduce_spec, state)
1434 def _get_input_readers(self, state):
1435 """Get input readers.
1437 Args:
1438 state: a MapreduceState model.
1440 Returns:
1441 A tuple: (a list of input readers, a model._HugeTaskPayload entity).
1442 The payload entity contains the json serialized input readers.
1443 (None, None) when input reader inplitting returned no data to process.
1445 serialized_input_readers_key = (self._SERIALIZED_INPUT_READERS_KEY %
1446 state.key().id_or_name())
1447 serialized_input_readers = model._HugeTaskPayload.get_by_key_name(
1448 serialized_input_readers_key, parent=state)
1451 input_reader_class = state.mapreduce_spec.mapper.input_reader_class()
1452 split_param = state.mapreduce_spec.mapper
1453 if issubclass(input_reader_class, map_job.InputReader):
1454 split_param = map_job.JobConfig._to_map_job_config(
1455 state.mapreduce_spec,
1456 os.environ.get("HTTP_X_APPENGINE_QUEUENAME"))
1457 if serialized_input_readers is None:
1458 readers = input_reader_class.split_input(split_param)
1459 else:
1460 readers = [input_reader_class.from_json_str(json) for json in
1461 simplejson.loads(serialized_input_readers.payload)]
1463 if not readers:
1464 return None, None
1467 state.mapreduce_spec.mapper.shard_count = len(readers)
1468 state.active_shards = len(readers)
1471 if serialized_input_readers is None:
1473 serialized_input_readers = model._HugeTaskPayload(
1474 key_name=serialized_input_readers_key, parent=state)
1475 readers_json_str = [i.to_json_str() for i in readers]
1476 serialized_input_readers.payload = simplejson.dumps(readers_json_str)
1477 return readers, serialized_input_readers
1479 def _setup_output_writer(self, state):
1480 if not state.writer_state:
1481 output_writer_class = state.mapreduce_spec.mapper.output_writer_class()
1482 if output_writer_class:
1483 output_writer_class.init_job(state)
1485 @db.transactional
1486 def _save_states(self, state, serialized_readers_entity):
1487 """Run transaction to save state.
1489 Args:
1490 state: a model.MapreduceState entity.
1491 serialized_readers_entity: a model._HugeTaskPayload entity containing
1492 json serialized input readers.
1494 Returns:
1495 False if a fatal error is encountered and this task should be dropped
1496 immediately. True if transaction is successful. None if a previous
1497 attempt of this same transaction has already succeeded.
1499 mr_id = state.key().id_or_name()
1500 fresh_state = model.MapreduceState.get_by_job_id(mr_id)
1501 if not self._check_mr_state(fresh_state, mr_id):
1502 return False
1503 if fresh_state.active_shards != 0:
1504 logging.warning(
1505 "Mapreduce %s already has active shards. Looks like spurious task "
1506 "execution.", mr_id)
1507 return None
1508 config = util.create_datastore_write_config(state.mapreduce_spec)
1509 db.put([state, serialized_readers_entity], config=config)
1510 return True
1512 @classmethod
1513 def _schedule_shards(cls,
1514 spec,
1515 readers,
1516 queue_name,
1517 base_path,
1518 mr_state):
1519 """Prepares shard states and schedules their execution.
1521 Even though this method does not schedule shard task and save shard state
1522 transactionally, it's safe for taskqueue to retry this logic because
1523 the initial shard_state for each shard is the same from any retry.
1524 This is an important yet reasonable assumption on model.ShardState.
1526 Args:
1527 spec: mapreduce specification as MapreduceSpec.
1528 readers: list of InputReaders describing shard splits.
1529 queue_name: The queue to run this job on.
1530 base_path: The base url path of mapreduce callbacks.
1531 mr_state: The MapReduceState of current job.
1534 shard_states = []
1535 for shard_number, input_reader in enumerate(readers):
1536 shard_state = model.ShardState.create_new(spec.mapreduce_id, shard_number)
1537 shard_state.shard_description = str(input_reader)
1538 shard_states.append(shard_state)
1541 existing_shard_states = db.get(shard.key() for shard in shard_states)
1542 existing_shard_keys = set(shard.key() for shard in existing_shard_states
1543 if shard is not None)
1547 db.put((shard for shard in shard_states
1548 if shard.key() not in existing_shard_keys),
1549 config=util.create_datastore_write_config(spec))
1552 writer_class = spec.mapper.output_writer_class()
1553 writers = [None] * len(readers)
1554 if writer_class:
1555 for shard_number, shard_state in enumerate(shard_states):
1556 writers[shard_number] = writer_class.create(
1557 mr_state.mapreduce_spec,
1558 shard_state.shard_number, shard_state.retries + 1,
1559 mr_state.writer_state)
1564 for shard_number, (input_reader, output_writer) in enumerate(
1565 zip(readers, writers)):
1566 shard_id = model.ShardState.shard_id_from_number(
1567 spec.mapreduce_id, shard_number)
1568 task = MapperWorkerCallbackHandler._state_to_task(
1569 model.TransientShardState(
1570 base_path, spec, shard_id, 0, input_reader, input_reader,
1571 output_writer=output_writer,
1572 handler=spec.mapper.handler),
1573 shard_states[shard_number])
1574 MapperWorkerCallbackHandler._add_task(task,
1575 spec,
1576 queue_name)
1578 @classmethod
1579 def _check_mr_state(cls, state, mr_id):
1580 """Check MapreduceState.
1582 Args:
1583 state: an MapreduceState instance.
1584 mr_id: mapreduce id.
1586 Returns:
1587 True if state is valid. False if not and this task should be dropped.
1589 if state is None:
1590 logging.warning(
1591 "Mapreduce State for job %s is missing. Dropping Task.",
1592 mr_id)
1593 return False
1594 if not state.active:
1595 logging.warning(
1596 "Mapreduce %s is not active. Looks like spurious task "
1597 "execution. Dropping Task.", mr_id)
1598 return False
1599 return True
1602 class StartJobHandler(base_handler.PostJsonHandler):
1603 """Command handler starts a mapreduce job.
1605 This handler allows user to start a mr via a web form. It's _start_map
1606 method can also be used independently to start a mapreduce.
1609 def handle(self):
1610 """Handles start request."""
1612 mapreduce_name = self._get_required_param("name")
1613 mapper_input_reader_spec = self._get_required_param("mapper_input_reader")
1614 mapper_handler_spec = self._get_required_param("mapper_handler")
1615 mapper_output_writer_spec = self.request.get("mapper_output_writer")
1616 mapper_params = self._get_params(
1617 "mapper_params_validator", "mapper_params.")
1618 params = self._get_params(
1619 "params_validator", "params.")
1622 mr_params = map_job.JobConfig._get_default_mr_params()
1623 mr_params.update(params)
1624 if "queue_name" in mapper_params:
1625 mr_params["queue_name"] = mapper_params["queue_name"]
1628 mapper_params["processing_rate"] = int(mapper_params.get(
1629 "processing_rate") or parameters.config.PROCESSING_RATE_PER_SEC)
1632 mapper_spec = model.MapperSpec(
1633 mapper_handler_spec,
1634 mapper_input_reader_spec,
1635 mapper_params,
1636 int(mapper_params.get("shard_count", parameters.config.SHARD_COUNT)),
1637 output_writer_spec=mapper_output_writer_spec)
1639 mapreduce_id = self._start_map(
1640 mapreduce_name,
1641 mapper_spec,
1642 mr_params,
1643 queue_name=mr_params["queue_name"],
1644 _app=mapper_params.get("_app"))
1645 self.json_response["mapreduce_id"] = mapreduce_id
1647 def _get_params(self, validator_parameter, name_prefix):
1648 """Retrieves additional user-supplied params for the job and validates them.
1650 Args:
1651 validator_parameter: name of the request parameter which supplies
1652 validator for this parameter set.
1653 name_prefix: common prefix for all parameter names in the request.
1655 Raises:
1656 Any exception raised by the 'params_validator' request parameter if
1657 the params fail to validate.
1659 Returns:
1660 The user parameters.
1662 params_validator = self.request.get(validator_parameter)
1664 user_params = {}
1665 for key in self.request.arguments():
1666 if key.startswith(name_prefix):
1667 values = self.request.get_all(key)
1668 adjusted_key = key[len(name_prefix):]
1669 if len(values) == 1:
1670 user_params[adjusted_key] = values[0]
1671 else:
1672 user_params[adjusted_key] = values
1674 if params_validator:
1675 resolved_validator = util.for_name(params_validator)
1676 resolved_validator(user_params)
1678 return user_params
1680 def _get_required_param(self, param_name):
1681 """Get a required request parameter.
1683 Args:
1684 param_name: name of request parameter to fetch.
1686 Returns:
1687 parameter value
1689 Raises:
1690 errors.NotEnoughArgumentsError: if parameter is not specified.
1692 value = self.request.get(param_name)
1693 if not value:
1694 raise errors.NotEnoughArgumentsError(param_name + " not specified")
1695 return value
1697 @classmethod
1698 def _start_map(cls,
1699 name,
1700 mapper_spec,
1701 mapreduce_params,
1702 queue_name,
1703 eta=None,
1704 countdown=None,
1705 hooks_class_name=None,
1706 _app=None,
1707 in_xg_transaction=False):
1710 """See control.start_map.
1712 Requirements for this method:
1713 1. The request that invokes this method can either be regular or
1714 from taskqueue. So taskqueue specific headers can not be used.
1715 2. Each invocation transactionally starts an isolated mapreduce job with
1716 a unique id. MapreduceState should be immediately available after
1717 returning. See control.start_map's doc on transactional.
1718 3. Method should be lightweight.
1721 mapper_input_reader_class = mapper_spec.input_reader_class()
1722 mapper_input_reader_class.validate(mapper_spec)
1725 mapper_output_writer_class = mapper_spec.output_writer_class()
1726 if mapper_output_writer_class:
1727 mapper_output_writer_class.validate(mapper_spec)
1730 mapreduce_id = model.MapreduceState.new_mapreduce_id()
1731 mapreduce_spec = model.MapreduceSpec(
1732 name,
1733 mapreduce_id,
1734 mapper_spec.to_json(),
1735 mapreduce_params,
1736 hooks_class_name)
1739 ctx = context.Context(mapreduce_spec, None)
1740 context.Context._set(ctx)
1741 try:
1743 mapper_spec.handler
1744 finally:
1745 context.Context._set(None)
1748 if in_xg_transaction:
1749 propagation = db.MANDATORY
1750 else:
1751 propagation = db.INDEPENDENT
1753 @db.transactional(propagation=propagation)
1754 def _txn():
1755 cls._create_and_save_state(mapreduce_spec, _app)
1756 cls._add_kickoff_task(mapreduce_params["base_path"], mapreduce_spec, eta,
1757 countdown, queue_name)
1758 _txn()
1760 return mapreduce_id
1762 @classmethod
1763 def _create_and_save_state(cls, mapreduce_spec, _app):
1764 """Save mapreduce state to datastore.
1766 Save state to datastore so that UI can see it immediately.
1768 Args:
1769 mapreduce_spec: model.MapreduceSpec,
1770 _app: app id if specified. None otherwise.
1772 Returns:
1773 The saved Mapreduce state.
1775 state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
1776 state.mapreduce_spec = mapreduce_spec
1777 state.active = True
1778 state.active_shards = 0
1779 if _app:
1780 state.app_id = _app
1781 config = util.create_datastore_write_config(mapreduce_spec)
1782 state.put(config=config)
1783 return state
1785 @classmethod
1786 def _add_kickoff_task(cls,
1787 base_path,
1788 mapreduce_spec,
1789 eta,
1790 countdown,
1791 queue_name):
1792 """Enqueues a new kickoff task."""
1793 params = {"mapreduce_id": mapreduce_spec.mapreduce_id}
1795 kickoff_task = taskqueue.Task(
1796 url=base_path + "/kickoffjob_callback/" + mapreduce_spec.mapreduce_id,
1797 headers=util._get_task_headers(mapreduce_spec.mapreduce_id),
1798 params=params,
1799 eta=eta,
1800 countdown=countdown)
1801 hooks = mapreduce_spec.get_hooks()
1802 if hooks is not None:
1803 try:
1804 hooks.enqueue_kickoff_task(kickoff_task, queue_name)
1805 return
1806 except NotImplementedError:
1807 pass
1808 kickoff_task.add(queue_name, transactional=True)
1811 class FinalizeJobHandler(base_handler.TaskQueueHandler):
1812 """Finalize map job by deleting all temporary entities."""
1814 def handle(self):
1815 mapreduce_id = self.request.get("mapreduce_id")
1816 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1817 if mapreduce_state:
1818 config = (
1819 util.create_datastore_write_config(mapreduce_state.mapreduce_spec))
1820 keys = [model.MapreduceControl.get_key_by_job_id(mapreduce_id)]
1821 for ss in model.ShardState.find_all_by_mapreduce_state(mapreduce_state):
1822 keys.extend(list(
1823 model._HugeTaskPayload.all().ancestor(ss).run(keys_only=True)))
1824 keys.extend(list(model._HugeTaskPayload.all().ancestor(
1825 mapreduce_state).run(keys_only=True)))
1826 db.delete(keys, config=config)
1828 @classmethod
1829 def schedule(cls, mapreduce_spec):
1830 """Schedule finalize task.
1832 Args:
1833 mapreduce_spec: mapreduce specification as MapreduceSpec.
1835 task_name = mapreduce_spec.mapreduce_id + "-finalize"
1836 finalize_task = taskqueue.Task(
1837 name=task_name,
1838 url=(mapreduce_spec.params["base_path"] + "/finalizejob_callback/" +
1839 mapreduce_spec.mapreduce_id),
1840 params={"mapreduce_id": mapreduce_spec.mapreduce_id},
1841 headers=util._get_task_headers(mapreduce_spec.mapreduce_id))
1842 queue_name = util.get_queue_name(None)
1843 if not _run_task_hook(mapreduce_spec.get_hooks(),
1844 "enqueue_controller_task",
1845 finalize_task,
1846 queue_name):
1847 try:
1848 finalize_task.add(queue_name)
1849 except (taskqueue.TombstonedTaskError,
1850 taskqueue.TaskAlreadyExistsError), e:
1851 logging.warning("Task %r already exists. %s: %s",
1852 task_name, e.__class__, e)
1855 class CleanUpJobHandler(base_handler.PostJsonHandler):
1856 """Command to kick off tasks to clean up a job's data."""
1858 def handle(self):
1859 mapreduce_id = self.request.get("mapreduce_id")
1861 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1862 if mapreduce_state:
1863 shard_keys = model.ShardState.calculate_keys_by_mapreduce_state(
1864 mapreduce_state)
1865 db.delete(shard_keys)
1866 db.delete(mapreduce_state)
1867 self.json_response["status"] = ("Job %s successfully cleaned up." %
1868 mapreduce_id)
1871 class AbortJobHandler(base_handler.PostJsonHandler):
1872 """Command to abort a running job."""
1874 def handle(self):
1875 model.MapreduceControl.abort(self.request.get("mapreduce_id"))
1876 self.json_response["status"] = "Abort signal sent."