3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """User API for controlling Map job execution."""
19 from google
.appengine
.api
import taskqueue
20 from google
.appengine
.datastore
import datastore_rpc
21 from google
.appengine
.ext
import db
22 from google
.appengine
.ext
.mapreduce
import model
23 from google
.appengine
.ext
.mapreduce
import util
24 from google
.appengine
.ext
.mapreduce
.api
.map_job
import map_job_config
31 """The job submitter's view of the job.
33 The class allows user to submit a job, control a submitted job,
34 query its state and result.
38 FAILED
= model
.MapreduceState
.RESULT_FAILED
39 ABORTED
= model
.MapreduceState
.RESULT_ABORTED
40 SUCCESS
= model
.MapreduceState
.RESULT_SUCCESS
42 STATUS_ENUM
= [RUNNING
, FAILED
, ABORTED
, SUCCESS
]
44 def __init__(self
, state
=None):
45 """Init the job instance representing the job with id job_id.
47 Do not directly call this method. Use class methods to construct
51 state: model.MapreduceState.
55 self
.job_config
= map_job_config
.JobConfig
._to
_map
_job
_config
(
57 queue_name
=state
.mapreduce_spec
.params
.get("queue_name"))
60 def get_job_by_id(cls
, job_id
=None):
61 """Gets the job instance representing the job with id job_id.
64 job_id: a job id, job_config.job_id, of a submitted job.
67 A Job instance for job_id.
69 state
= cls
.__get
_state
_by
_id
(job_id
)
76 One of the status enum.
79 if self
._state
.active
:
82 return self
._state
.result_status
86 model
.MapreduceControl
.abort(self
.job_config
.job_id
)
88 def get_counters(self
):
89 """Get counters from this job.
91 When a job is running, counter values won't be very accurate.
94 An iterator that returns (counter_name, value) pairs of type
98 return self
._state
.counters_map
.counters
.iteritems()
100 def get_outputs(self
):
101 """Get outputs of this job.
103 Should only call if status is SUCCESS.
106 Iterators, one for each shard. Each iterator is
107 from the argument of map_job.output_writer.commit_output.
109 assert self
.SUCCESS
== self
.get_status()
110 ss
= model
.ShardState
.find_all_by_mapreduce_state(self
._state
)
112 yield iter(s
.writer_state
.get("outs", []))
115 def submit(cls
, job_config
, in_xg_transaction
=False):
116 """Submit the job to run.
119 job_config: an instance of map_job.MapJobConfig.
120 in_xg_transaction: controls what transaction scope to use to start this MR
121 job. If True, there has to be an already opened cross-group transaction
122 scope. MR will use one entity group from it.
123 If False, MR will create an independent transaction to start the job
124 regardless of any existing transaction scopes.
127 a Job instance representing the submitted job.
129 cls
.__validate
_job
_config
(job_config
)
130 mapper_spec
= job_config
._get
_mapper
_spec
()
133 mapreduce_params
= job_config
._get
_mr
_params
()
134 mapreduce_spec
= model
.MapreduceSpec(
137 mapper_spec
.to_json(),
139 util
._obj
_to
_path
(job_config
._hooks
_cls
))
142 if in_xg_transaction
:
143 propagation
= db
.MANDATORY
145 propagation
= db
.INDEPENDENT
148 @db.transactional(propagation
=propagation
)
150 state
= cls
.__create
_and
_save
_state
(job_config
, mapreduce_spec
)
151 cls
.__add
_kickoff
_task
(job_config
, mapreduce_spec
)
157 def __update_state(self
):
158 """Fetches most up to date state from db."""
160 if self
._state
.active
:
161 self
._state
= self
.__get
_state
_by
_id
(self
.job_config
.job_id
)
164 def __get_state_by_id(cls
, job_id
):
165 """Get job state by id.
171 model.MapreduceState for the job.
174 ValueError: if the job state is missing.
176 state
= model
.MapreduceState
.get_by_job_id(job_id
)
178 raise ValueError("Job state for job %s is missing." % job_id
)
182 def __validate_job_config(cls
, job_config
):
184 job_config
.input_reader_cls
.validate(job_config
)
185 if job_config
.output_writer_cls
:
186 job_config
.output_writer_cls
.validate(job_config
._get
_mapper
_spec
())
189 def __create_and_save_state(cls
, job_config
, mapreduce_spec
):
190 """Save map job state to datastore.
192 Save state to datastore so that UI can see it immediately.
195 job_config: map_job.JobConfig.
196 mapreduce_spec: model.MapreduceSpec.
199 model.MapreduceState for this job.
201 state
= model
.MapreduceState
.create_new(job_config
.job_id
)
202 state
.mapreduce_spec
= mapreduce_spec
204 state
.active_shards
= 0
205 state
.app_id
= job_config
._app
206 config
= datastore_rpc
.Configuration(force_writes
=job_config
._force
_writes
)
207 state
.put(config
=config
)
211 def __add_kickoff_task(cls
, job_config
, mapreduce_spec
):
212 """Add kickoff task to taskqueue.
215 job_config: map_job.JobConfig.
216 mapreduce_spec: model.MapreduceSpec,
218 params
= {"mapreduce_id": job_config
.job_id
}
220 kickoff_task
= taskqueue
.Task(
222 url
=job_config
._base
_path
+ "/kickoffjob_callback/" + job_config
.job_id
,
223 headers
=util
._get
_task
_headers
(job_config
.job_id
),
225 if job_config
._hooks
_cls
:
226 hooks
= job_config
._hooks
_cls
(mapreduce_spec
)
228 hooks
.enqueue_kickoff_task(kickoff_task
, job_config
.queue_name
)
230 except NotImplementedError:
232 kickoff_task
.add(job_config
.queue_name
, transactional
=True)