2 from autotest_lib
.frontend
.afe
import model_attributes
as afe_model_attributes
3 from autotest_lib
.frontend
.shared
import rest_client
4 from autotest_lib
.frontend
.planner
import model_attributes
, support
5 from autotest_lib
.server
import frontend
8 TICK_INTERVAL_SECS
= 10
9 PAUSE_BEFORE_RESTARTING_SECS
= 60
11 class ExecutionEngine(object):
13 Provides the Test Planner execution engine
16 _planner_rpc
= frontend
.Planner()
17 _tko_rpc
= frontend
.TKO()
19 def __init__(self
, plan_id
, server
, label_name
, owner
):
20 self
._plan
_id
= plan_id
22 self
._label
_name
= label_name
24 self
._afe
_rest
= rest_client
.Resource
.load(
25 'http://%s/afe/server/resources' % server
)
30 Starts the execution engine.
32 Thread remains in this method until the execution engine is complete.
36 self
._initialize
_plan
()
38 while not self
._tick
():
39 time
.sleep(TICK_INTERVAL_SECS
)
44 logging
.error('Execution engine caught exception, restarting:'
46 time
.sleep(PAUSE_BEFORE_RESTARTING_SECS
)
49 def _initialize_plan(self
):
51 Performs actions necessary to start a test plan.
53 Adds the hosts into the proper atomic group, and waits for the plan to
54 be ready to start before returning
56 plan
= self
._planner
_rpc
.run('get_plan', id=self
._plan
_id
)
57 name
= plan
['name'] + '_set_atomic_group'
58 if not self
._afe
_rest
.jobs
.get(name
=name
).total_results
:
59 self
._launch
_set
_atomic
_group
_job
(name
)
61 self
._wait
_for
_initialization
()
64 def _launch_set_atomic_group_job(self
, name
):
66 Launch the job to set the hosts' atomic group, and initate the plan
68 If the hosts are already part of an atomic group, wait for a tick and
69 try again. Return when successful
72 hosts
= self
._planner
_rpc
.run('get_hosts', plan_id
=self
._plan
_id
)
73 control
= (self
._planner
_rpc
.run('get_atomic_group_control_file') %
74 dict(server
=self
._server
, label_name
=self
._label
_name
,
75 plan_id
=self
._plan
_id
))
77 info
= self
._afe
_rest
.execution_info
.get().execution_info
78 info
['control_file'] = control
79 info
['cleanup_before_job'] = afe_model_attributes
.RebootBefore
.NEVER
80 info
['cleanup_after_job'] = afe_model_attributes
.RebootAfter
.NEVER
81 info
['run_verify'] = False
82 info
['machines_per_execution'] = len(hosts
)
84 entries
= self
._afe
_rest
.queue_entries_request
.get(
85 hosts
=hosts
).queue_entries
87 job_req
= {'name' : name
,
89 'execution_info' : info
,
90 'queue_entries' : entries
}
93 self
._afe
_rest
.jobs
.post(job_req
)
94 logging
.info('created job to set atomic group')
96 except rest_client
.ClientError
, e
:
97 logging
.info('hosts already in atomic group')
98 logging
.info('(error was %s)' % e
.message
)
99 logging
.info('waiting...')
100 time
.sleep(TICK_INTERVAL_SECS
)
103 def _wait_for_initialization(self
):
105 plan
= self
._planner
_rpc
.run('get_plan', id=self
._plan
_id
)
106 if plan
['initialized']:
108 logging
.info('waiting for initialization...')
109 time
.sleep(TICK_INTERVAL_SECS
)
113 self
._afe
_rest
.labels
.get(name
=self
._label
_name
).members
[0].delete()
118 Processes one tick of the execution engine.
120 Returns True if the engine has completed the plan.
123 self
._process
_finished
_runs
()
124 self
._check
_tko
_jobs
()
125 return self
._schedule
_new
_runs
()
128 def _process_finished_runs(self
):
130 Finalize the test runs that have finished.
132 Look for runs that are in PASSED or FAILED, perform any additional
133 processing required, and set the entry to 'finalized'.
135 Status
= model_attributes
.TestRunStatus
136 runs
= self
._planner
_rpc
.run('get_test_runs', plan__id
=self
._plan
_id
,
137 status__in
=(Status
.PASSED
, Status
.FAILED
),
140 logging
.info('finalizing test run %s', run
)
142 controller
= support
.TestPlanController(
143 machine
=run
['host']['host'],
144 test_alias
=run
['test_job']['test_config']['alias'])
145 self
._run
_execute
_after
(controller
, tko_test_id
=run
['tko_test'],
146 success
=(run
['status'] == Status
.PASSED
))
149 raise NotImplemented('TODO: implement forced failure')
151 failed
= (run
['status'] == Status
.FAILED
or controller
._fail
)
152 if failed
and not controller
._unblock
:
153 self
._planner
_rpc
.run('modify_host', id=run
['host']['id'],
155 self
._planner
_rpc
.run('modify_test_run', id=run
['id'],
159 def _check_tko_jobs(self
):
161 Instructs the server to update the Planner test runs table
163 Sends an RPC to have the server pull the proper TKO tests and add them
164 to the Planner tables. Logs information about what was added.
166 test_runs_updated
= self
._planner
_rpc
.run('update_test_runs',
167 plan_id
=self
._plan
_id
)
168 for update
in test_runs_updated
:
169 logging
.info('added %s test run for tko test id %s (%s)',
170 update
['status'], update
['tko_test_idx'],
174 def _schedule_new_runs(self
):
175 next_configs
= self
._planner
_rpc
.run('get_next_test_configs',
176 plan_id
=self
._plan
_id
)
177 if next_configs
['complete']:
180 for config
in next_configs
['next_configs']:
181 config_id
= config
['next_test_config_id']
182 controller
= support
.TestPlanController(
183 machine
=config
['host'],
184 test_alias
=config
['next_test_config_alias'])
185 self
._run
_execute
_before
(controller
)
187 self
._planner
_rpc
.run('skip_test', test_config_id
=config_id
,
188 hostname
=config
['host'])
191 self
._run
_job
(hostname
=config
['host'],
192 test_config_id
=config_id
,
193 cleanup_before_job
=controller
._reboot
_before
,
194 cleanup_after_job
=controller
._reboot
_after
,
195 run_verify
=controller
._run
_verify
)
200 def _run_job(self
, hostname
, test_config_id
, cleanup_before_job
,
201 cleanup_after_job
, run_verify
):
202 if run_verify
is None:
205 test_config
= self
._planner
_rpc
.run('get_wrapped_test_config',
208 run_verify
=run_verify
)
210 info
= self
._afe
_rest
.execution_info
.get().execution_info
211 info
['control_file'] = test_config
['wrapped_control_file']
212 info
['is_server'] = True
213 info
['cleanup_before_job'] = cleanup_before_job
214 info
['cleanup_after_job'] = cleanup_after_job
215 info
['run_verify'] = False
217 atomic_group_class
= self
._afe
_rest
.labels
.get(
218 name
=self
._label
_name
).members
[0].get().atomic_group_class
.href
220 request
= self
._afe
_rest
.queue_entries_request
.get(
221 hosts
=(hostname
,), atomic_group_class
=atomic_group_class
)
222 entries
= request
.queue_entries
224 plan
= self
._planner
_rpc
.run('get_plan', id=self
._plan
_id
)
225 prefix
= plan
['label_override']
227 prefix
= plan
['name']
228 job_req
= {'name' : '%s_%s_%s' % (prefix
, test_config
['alias'],
230 'owner': self
._owner
,
231 'execution_info' : info
,
232 'queue_entries' : entries
}
234 logging
.info('starting test alias %s for host %s',
235 test_config
['alias'], hostname
)
236 job
= self
._afe
_rest
.jobs
.post(job_req
)
237 self
._planner
_rpc
.run('add_job',
238 plan_id
=self
._plan
_id
,
239 test_config_id
=test_config_id
,
240 afe_job_id
=job
.get().id)
243 def _run_execute_before(self
, controller
):
245 Execute the global support's execute_before() for the plan
247 self
._run
_global
_support
(controller
, 'execute_before')
250 def _run_execute_after(self
, controller
, tko_test_id
, success
):
252 Execute the global support's execute_after() for the plan
254 self
._run
_global
_support
(controller
, 'execute_after',
255 tko_test_id
=tko_test_id
, success
=success
)
258 def _run_global_support(self
, controller
, function_name
, **kwargs
):
259 plan
= self
._planner
_rpc
.run('get_plan', id=self
._plan
_id
)
261 context
= {'model_attributes': afe_model_attributes
}
262 exec plan
['support'] in context
263 function
= context
.get(function_name
)
265 if not callable(function
):
266 raise Exception('Global support defines %s, but it is not '
267 'callable' % function_name
)
268 function(controller
, **kwargs
)