App Engine Python SDK version 1.9.3
[gae.git] / python / google / appengine / ext / mapreduce / api / map_job / map_job_control.py
blobabb4cf9e5182835549500c64933c2235dc498ade
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """User API for controlling Map job execution."""
19 from google.appengine.api import taskqueue
20 from google.appengine.datastore import datastore_rpc
21 from google.appengine.ext import db
22 from google.appengine.ext.mapreduce import model
23 from google.appengine.ext.mapreduce import util
24 from google.appengine.ext.mapreduce.api.map_job import map_job_config
30 class Job(object):
31 """The job submitter's view of the job.
33 The class allows user to submit a job, control a submitted job,
34 query its state and result.
35 """
37 RUNNING = "running"
38 FAILED = model.MapreduceState.RESULT_FAILED
39 ABORTED = model.MapreduceState.RESULT_ABORTED
40 SUCCESS = model.MapreduceState.RESULT_SUCCESS
42 STATUS_ENUM = [RUNNING, FAILED, ABORTED, SUCCESS]
44 def __init__(self, state=None):
45 """Init the job instance representing the job with id job_id.
47 Do not directly call this method. Use class methods to construct
48 new instances.
50 Args:
51 state: model.MapreduceState.
52 """
53 self._state = state
55 self.job_config = map_job_config.JobConfig._to_map_job_config(
56 state.mapreduce_spec,
57 queue_name=state.mapreduce_spec.params.get("queue_name"))
59 @classmethod
60 def get_job_by_id(cls, job_id=None):
61 """Gets the job instance representing the job with id job_id.
63 Args:
64 job_id: a job id, job_config.job_id, of a submitted job.
66 Returns:
67 A Job instance for job_id.
68 """
69 state = cls.__get_state_by_id(job_id)
70 return cls(state)
72 def get_status(self):
73 """Get status enum.
75 Returns:
76 One of the status enum.
77 """
78 self.__update_state()
79 if self._state.active:
80 return self.RUNNING
81 else:
82 return self._state.result_status
84 def abort(self):
85 """Aborts the job."""
86 model.MapreduceControl.abort(self.job_config.job_id)
88 def get_counters(self):
89 """Get counters from this job.
91 When a job is running, counter values won't be very accurate.
93 Returns:
94 An iterator that returns (counter_name, value) pairs of type
95 (basestring, int)
96 """
97 self.__update_state()
98 return self._state.counters_map.counters.iteritems()
100 def get_outputs(self):
101 """Get outputs of this job.
103 Should only call if status is SUCCESS.
105 Yields:
106 Iterators, one for each shard. Each iterator is
107 from the argument of map_job.output_writer.commit_output.
109 assert self.SUCCESS == self.get_status()
110 ss = model.ShardState.find_all_by_mapreduce_state(self._state)
111 for s in ss:
112 yield iter(s.writer_state.get("outs", []))
114 @classmethod
115 def submit(cls, job_config, in_xg_transaction=False):
116 """Submit the job to run.
118 Args:
119 job_config: an instance of map_job.MapJobConfig.
120 in_xg_transaction: controls what transaction scope to use to start this MR
121 job. If True, there has to be an already opened cross-group transaction
122 scope. MR will use one entity group from it.
123 If False, MR will create an independent transaction to start the job
124 regardless of any existing transaction scopes.
126 Returns:
127 a Job instance representing the submitted job.
129 cls.__validate_job_config(job_config)
130 mapper_spec = job_config._get_mapper_spec()
133 mapreduce_params = job_config._get_mr_params()
134 mapreduce_spec = model.MapreduceSpec(
135 job_config.job_name,
136 job_config.job_id,
137 mapper_spec.to_json(),
138 mapreduce_params,
139 util._obj_to_path(job_config._hooks_cls))
142 if in_xg_transaction:
143 propagation = db.MANDATORY
144 else:
145 propagation = db.INDEPENDENT
147 state = None
148 @db.transactional(propagation=propagation)
149 def _txn():
150 state = cls.__create_and_save_state(job_config, mapreduce_spec)
151 cls.__add_kickoff_task(job_config, mapreduce_spec)
152 return state
154 state = _txn()
155 return cls(state)
157 def __update_state(self):
158 """Fetches most up to date state from db."""
160 if self._state.active:
161 self._state = self.__get_state_by_id(self.job_config.job_id)
163 @classmethod
164 def __get_state_by_id(cls, job_id):
165 """Get job state by id.
167 Args:
168 job_id: job id.
170 Returns:
171 model.MapreduceState for the job.
173 Raises:
174 ValueError: if the job state is missing.
176 state = model.MapreduceState.get_by_job_id(job_id)
177 if state is None:
178 raise ValueError("Job state for job %s is missing." % job_id)
179 return state
181 @classmethod
182 def __validate_job_config(cls, job_config):
184 job_config.input_reader_cls.validate(job_config)
185 if job_config.output_writer_cls:
186 job_config.output_writer_cls.validate(job_config._get_mapper_spec())
188 @classmethod
189 def __create_and_save_state(cls, job_config, mapreduce_spec):
190 """Save map job state to datastore.
192 Save state to datastore so that UI can see it immediately.
194 Args:
195 job_config: map_job.JobConfig.
196 mapreduce_spec: model.MapreduceSpec.
198 Returns:
199 model.MapreduceState for this job.
201 state = model.MapreduceState.create_new(job_config.job_id)
202 state.mapreduce_spec = mapreduce_spec
203 state.active = True
204 state.active_shards = 0
205 state.app_id = job_config._app
206 config = datastore_rpc.Configuration(force_writes=job_config._force_writes)
207 state.put(config=config)
208 return state
210 @classmethod
211 def __add_kickoff_task(cls, job_config, mapreduce_spec):
212 """Add kickoff task to taskqueue.
214 Args:
215 job_config: map_job.JobConfig.
216 mapreduce_spec: model.MapreduceSpec,
218 params = {"mapreduce_id": job_config.job_id}
220 kickoff_task = taskqueue.Task(
222 url=job_config._base_path + "/kickoffjob_callback/" + job_config.job_id,
223 headers=util._get_task_headers(job_config.job_id),
224 params=params)
225 if job_config._hooks_cls:
226 hooks = job_config._hooks_cls(mapreduce_spec)
227 try:
228 hooks.enqueue_kickoff_task(kickoff_task, job_config.queue_name)
229 return
230 except NotImplementedError:
231 pass
232 kickoff_task.add(job_config.queue_name, transactional=True)