frontend.afe.model_logic: Rename validate_unique
[autotest-zwu.git] / frontend / planner / execution_engine.py
blobe1b0dad10c2bec27115b06194750ab38cc334e56
1 import time, logging
2 from autotest_lib.frontend.afe import model_attributes as afe_model_attributes
3 from autotest_lib.frontend.shared import rest_client
4 from autotest_lib.frontend.planner import model_attributes, support
5 from autotest_lib.server import frontend
8 TICK_INTERVAL_SECS = 10
9 PAUSE_BEFORE_RESTARTING_SECS = 60
11 class ExecutionEngine(object):
12 """
13 Provides the Test Planner execution engine
14 """
16 _planner_rpc = frontend.Planner()
17 _tko_rpc = frontend.TKO()
19 def __init__(self, plan_id, server, label_name, owner):
20 self._plan_id = plan_id
21 self._server = server
22 self._label_name = label_name
23 self._owner = owner
24 self._afe_rest = rest_client.Resource.load(
25 'http://%s/afe/server/resources' % server)
28 def start(self):
29 """
30 Starts the execution engine.
32 Thread remains in this method until the execution engine is complete.
33 """
34 while True:
35 try:
36 self._initialize_plan()
38 while not self._tick():
39 time.sleep(TICK_INTERVAL_SECS)
41 self._cleanup()
42 break
43 except Exception, e:
44 logging.error('Execution engine caught exception, restarting:'
45 '\n%s', e)
46 time.sleep(PAUSE_BEFORE_RESTARTING_SECS)
49 def _initialize_plan(self):
50 """
51 Performs actions necessary to start a test plan.
53 Adds the hosts into the proper atomic group, and waits for the plan to
54 be ready to start before returning
55 """
56 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
57 name = plan['name'] + '_set_atomic_group'
58 if not self._afe_rest.jobs.get(name=name).total_results:
59 self._launch_set_atomic_group_job(name)
61 self._wait_for_initialization()
64 def _launch_set_atomic_group_job(self, name):
65 """
66 Launch the job to set the hosts' atomic group, and initate the plan
68 If the hosts are already part of an atomic group, wait for a tick and
69 try again. Return when successful
70 """
71 while True:
72 hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id)
73 control = (self._planner_rpc.run('get_atomic_group_control_file') %
74 dict(server=self._server, label_name=self._label_name,
75 plan_id=self._plan_id))
77 info = self._afe_rest.execution_info.get().execution_info
78 info['control_file'] = control
79 info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER
80 info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER
81 info['run_verify'] = False
82 info['machines_per_execution'] = len(hosts)
84 entries = self._afe_rest.queue_entries_request.get(
85 hosts=hosts).queue_entries
87 job_req = {'name' : name,
88 'owner': self._owner,
89 'execution_info' : info,
90 'queue_entries' : entries}
92 try:
93 self._afe_rest.jobs.post(job_req)
94 logging.info('created job to set atomic group')
95 break
96 except rest_client.ClientError, e:
97 logging.info('hosts already in atomic group')
98 logging.info('(error was %s)' % e.message)
99 logging.info('waiting...')
100 time.sleep(TICK_INTERVAL_SECS)
103 def _wait_for_initialization(self):
104 while True:
105 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
106 if plan['initialized']:
107 break
108 logging.info('waiting for initialization...')
109 time.sleep(TICK_INTERVAL_SECS)
112 def _cleanup(self):
113 self._afe_rest.labels.get(name=self._label_name).members[0].delete()
116 def _tick(self):
118 Processes one tick of the execution engine.
120 Returns True if the engine has completed the plan.
122 logging.info('tick')
123 self._process_finished_runs()
124 self._check_tko_jobs()
125 return self._schedule_new_runs()
128 def _process_finished_runs(self):
130 Finalize the test runs that have finished.
132 Look for runs that are in PASSED or FAILED, perform any additional
133 processing required, and set the entry to 'finalized'.
135 Status = model_attributes.TestRunStatus
136 runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id,
137 status__in=(Status.PASSED, Status.FAILED),
138 finalized=False)
139 for run in runs:
140 logging.info('finalizing test run %s', run)
142 controller = support.TestPlanController(
143 machine=run['host']['host'],
144 test_alias=run['test_job']['test_config']['alias'])
145 self._run_execute_after(controller, tko_test_id=run['tko_test'],
146 success=(run['status'] == Status.PASSED))
148 if controller._fail:
149 raise NotImplemented('TODO: implement forced failure')
151 failed = (run['status'] == Status.FAILED or controller._fail)
152 if failed and not controller._unblock:
153 self._planner_rpc.run('modify_host', id=run['host']['id'],
154 blocked=True)
155 self._planner_rpc.run('modify_test_run', id=run['id'],
156 finalized=True)
159 def _check_tko_jobs(self):
161 Instructs the server to update the Planner test runs table
163 Sends an RPC to have the server pull the proper TKO tests and add them
164 to the Planner tables. Logs information about what was added.
166 test_runs_updated = self._planner_rpc.run('update_test_runs',
167 plan_id=self._plan_id)
168 for update in test_runs_updated:
169 logging.info('added %s test run for tko test id %s (%s)',
170 update['status'], update['tko_test_idx'],
171 update['hostname'])
174 def _schedule_new_runs(self):
175 next_configs = self._planner_rpc.run('get_next_test_configs',
176 plan_id=self._plan_id)
177 if next_configs['complete']:
178 return True
180 for config in next_configs['next_configs']:
181 config_id = config['next_test_config_id']
182 controller = support.TestPlanController(
183 machine=config['host'],
184 test_alias=config['next_test_config_alias'])
185 self._run_execute_before(controller)
186 if controller._skip:
187 self._planner_rpc.run('skip_test', test_config_id=config_id,
188 hostname=config['host'])
189 continue
191 self._run_job(hostname=config['host'],
192 test_config_id=config_id,
193 cleanup_before_job=controller._reboot_before,
194 cleanup_after_job=controller._reboot_after,
195 run_verify=controller._run_verify)
197 return False
200 def _run_job(self, hostname, test_config_id, cleanup_before_job,
201 cleanup_after_job, run_verify):
202 if run_verify is None:
203 run_verify = True
205 test_config = self._planner_rpc.run('get_wrapped_test_config',
206 id=test_config_id,
207 hostname=hostname,
208 run_verify=run_verify)
210 info = self._afe_rest.execution_info.get().execution_info
211 info['control_file'] = test_config['wrapped_control_file']
212 info['is_server'] = True
213 info['cleanup_before_job'] = cleanup_before_job
214 info['cleanup_after_job'] = cleanup_after_job
215 info['run_verify'] = False
217 atomic_group_class = self._afe_rest.labels.get(
218 name=self._label_name).members[0].get().atomic_group_class.href
220 request = self._afe_rest.queue_entries_request.get(
221 hosts=(hostname,), atomic_group_class=atomic_group_class)
222 entries = request.queue_entries
224 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
225 prefix = plan['label_override']
226 if prefix is None:
227 prefix = plan['name']
228 job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'],
229 hostname),
230 'owner': self._owner,
231 'execution_info' : info,
232 'queue_entries' : entries}
234 logging.info('starting test alias %s for host %s',
235 test_config['alias'], hostname)
236 job = self._afe_rest.jobs.post(job_req)
237 self._planner_rpc.run('add_job',
238 plan_id=self._plan_id,
239 test_config_id=test_config_id,
240 afe_job_id=job.get().id)
243 def _run_execute_before(self, controller):
245 Execute the global support's execute_before() for the plan
247 self._run_global_support(controller, 'execute_before')
250 def _run_execute_after(self, controller, tko_test_id, success):
252 Execute the global support's execute_after() for the plan
254 self._run_global_support(controller, 'execute_after',
255 tko_test_id=tko_test_id, success=success)
258 def _run_global_support(self, controller, function_name, **kwargs):
259 plan = self._planner_rpc.run('get_plan', id=self._plan_id)
260 if plan['support']:
261 context = {'model_attributes': afe_model_attributes}
262 exec plan['support'] in context
263 function = context.get(function_name)
264 if function:
265 if not callable(function):
266 raise Exception('Global support defines %s, but it is not '
267 'callable' % function_name)
268 function(controller, **kwargs)