KVM test: tests_base.cfg. sample: Fix test dependencies
[autotest-zwu.git] / scheduler / drone_utility.py
blobc84a033de08f2f09d667c3527fc735dde6cf58c0
1 #!/usr/bin/python
3 import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
4 import datetime, traceback, tempfile, itertools, logging
5 import common
6 from autotest_lib.client.common_lib import utils, global_config, error
7 from autotest_lib.server import hosts, subcommand
8 from autotest_lib.scheduler import email_manager, scheduler_config
10 # An environment variable we add to the environment to enable us to
11 # distinguish processes we started from those that were started by
12 # something else during recovery. Name credit goes to showard. ;)
13 DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
15 _TEMPORARY_DIRECTORY = 'drone_tmp'
16 _TRANSFER_FAILED_FILE = '.transfer_failed'
19 class _MethodCall(object):
20 def __init__(self, method, args, kwargs):
21 self._method = method
22 self._args = args
23 self._kwargs = kwargs
26 def execute_on(self, drone_utility):
27 method = getattr(drone_utility, self._method)
28 return method(*self._args, **self._kwargs)
31 def __str__(self):
32 args = ', '.join(repr(arg) for arg in self._args)
33 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
34 self._kwargs.iteritems())
35 full_args = ', '.join(item for item in (args, kwargs) if item)
36 return '%s(%s)' % (self._method, full_args)
39 def call(method, *args, **kwargs):
40 return _MethodCall(method, args, kwargs)
43 class DroneUtility(object):
44 """
45 This class executes actual OS calls on the drone machine.
47 All paths going into and out of this class are absolute.
48 """
49 _WARNING_DURATION = 60
51 def __init__(self):
52 # Tattoo ourselves so that all of our spawn bears our mark.
53 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
55 self.warnings = []
56 self._subcommands = []
59 def initialize(self, results_dir):
60 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
61 if os.path.exists(temporary_directory):
62 shutil.rmtree(temporary_directory)
63 self._ensure_directory_exists(temporary_directory)
64 build_extern_cmd = os.path.join(results_dir,
65 '../utils/build_externals.py')
66 utils.run(build_extern_cmd)
69 def _warn(self, warning):
70 self.warnings.append(warning)
73 @staticmethod
74 def _check_pid_for_dark_mark(pid, open=open):
75 try:
76 env_file = open('/proc/%s/environ' % pid, 'rb')
77 except EnvironmentError:
78 return False
79 try:
80 env_data = env_file.read()
81 finally:
82 env_file.close()
83 return DARK_MARK_ENVIRONMENT_VAR in env_data
86 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
89 @classmethod
90 def _get_process_info(cls):
91 """
92 @returns A generator of dicts with cls._PS_ARGS as keys and
93 string values each representing a running process.
94 """
95 ps_proc = subprocess.Popen(
96 ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)],
97 stdout=subprocess.PIPE)
98 ps_output = ps_proc.communicate()[0]
100 # split each line into the columns output by ps
101 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
102 return (dict(itertools.izip(cls._PS_ARGS, line_components))
103 for line_components in split_lines)
106 def _refresh_processes(self, command_name, open=open,
107 site_check_parse=None):
108 # The open argument is used for test injection.
109 check_mark = global_config.global_config.get_config_value(
110 'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
111 processes = []
112 for info in self._get_process_info():
113 is_parse = (site_check_parse and site_check_parse(info))
114 if info['comm'] == command_name or is_parse:
115 if (check_mark and not
116 self._check_pid_for_dark_mark(info['pid'], open=open)):
117 self._warn('%(comm)s process pid %(pid)s has no '
118 'dark mark; ignoring.' % info)
119 continue
120 processes.append(info)
122 return processes
125 def _read_pidfiles(self, pidfile_paths):
126 pidfiles = {}
127 for pidfile_path in pidfile_paths:
128 if not os.path.exists(pidfile_path):
129 continue
130 try:
131 file_object = open(pidfile_path, 'r')
132 pidfiles[pidfile_path] = file_object.read()
133 file_object.close()
134 except IOError:
135 continue
136 return pidfiles
139 def refresh(self, pidfile_paths):
141 pidfile_paths should be a list of paths to check for pidfiles.
143 Returns a dict containing:
144 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
145 that exist.
146 * autoserv_processes: list of dicts corresponding to running autoserv
147 processes. each dict contain pid, pgid, ppid, comm, and args (see
148 "man ps" for details).
149 * parse_processes: likewise, for parse processes.
150 * pidfiles_second_read: same info as pidfiles, but gathered after the
151 processes are scanned.
153 site_check_parse = utils.import_site_function(
154 __file__, 'autotest_lib.scheduler.site_drone_utility',
155 'check_parse', lambda x: False)
156 results = {
157 'pidfiles' : self._read_pidfiles(pidfile_paths),
158 'autoserv_processes' : self._refresh_processes('autoserv'),
159 'parse_processes' : self._refresh_processes(
160 'parse', site_check_parse=site_check_parse),
161 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
163 return results
166 def kill_process(self, process):
167 signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL)
168 utils.nuke_pid(process.pid, signal_queue=signal_queue)
171 def _convert_old_host_log(self, log_path):
173 For backwards compatibility only. This can safely be removed in the
174 future.
176 The scheduler used to create files at results/hosts/<hostname>, and
177 append all host logs to that file. Now, it creates directories at
178 results/hosts/<hostname>, and places individual timestamped log files
179 into that directory.
181 This can be a problem the first time the scheduler runs after upgrading.
182 To work around that, we'll look for a file at the path where the
183 directory should be, and if we find one, we'll automatically convert it
184 to a directory containing the old logfile.
186 # move the file out of the way
187 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
188 base_name = os.path.basename(log_path)
189 temp_path = os.path.join(temp_dir, base_name)
190 os.rename(log_path, temp_path)
192 os.mkdir(log_path)
194 # and move it into the new directory
195 os.rename(temp_path, os.path.join(log_path, 'old_log'))
196 os.rmdir(temp_dir)
199 def _ensure_directory_exists(self, path):
200 if os.path.isdir(path):
201 return
203 if os.path.exists(path):
204 # path exists already, but as a file, not a directory
205 if '/hosts/' in path:
206 self._convert_old_host_log(path)
207 return
208 else:
209 raise IOError('Path %s exists as a file, not a directory')
211 os.makedirs(path)
214 def execute_command(self, command, working_directory, log_file,
215 pidfile_name):
216 out_file = None
217 if log_file:
218 self._ensure_directory_exists(os.path.dirname(log_file))
219 try:
220 out_file = open(log_file, 'a')
221 separator = ('*' * 80) + '\n'
222 out_file.write('\n' + separator)
223 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
224 out_file.write(separator)
225 except (OSError, IOError):
226 email_manager.manager.log_stacktrace(
227 'Error opening log file %s' % log_file)
229 if not out_file:
230 out_file = open('/dev/null', 'w')
232 in_devnull = open('/dev/null', 'r')
234 self._ensure_directory_exists(working_directory)
235 pidfile_path = os.path.join(working_directory, pidfile_name)
236 if os.path.exists(pidfile_path):
237 self._warn('Pidfile %s already exists' % pidfile_path)
238 os.remove(pidfile_path)
240 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
241 stdin=in_devnull)
242 out_file.close()
243 in_devnull.close()
246 def write_to_file(self, file_path, contents):
247 self._ensure_directory_exists(os.path.dirname(file_path))
248 try:
249 file_object = open(file_path, 'a')
250 file_object.write(contents)
251 file_object.close()
252 except IOError, exc:
253 self._warn('Error write to file %s: %s' % (file_path, exc))
256 def copy_file_or_directory(self, source_path, destination_path):
258 This interface is designed to match server.hosts.abstract_ssh.get_file
259 (and send_file). That is, if the source_path ends with a slash, the
260 contents of the directory are copied; otherwise, the directory iself is
261 copied.
263 if self._same_file(source_path, destination_path):
264 return
265 self._ensure_directory_exists(os.path.dirname(destination_path))
266 if source_path.endswith('/'):
267 # copying a directory's contents to another directory
268 assert os.path.isdir(source_path)
269 assert os.path.isdir(destination_path)
270 for filename in os.listdir(source_path):
271 self.copy_file_or_directory(
272 os.path.join(source_path, filename),
273 os.path.join(destination_path, filename))
274 elif os.path.isdir(source_path):
275 shutil.copytree(source_path, destination_path, symlinks=True)
276 elif os.path.islink(source_path):
277 # copied from shutil.copytree()
278 link_to = os.readlink(source_path)
279 os.symlink(link_to, destination_path)
280 else:
281 shutil.copy(source_path, destination_path)
284 def _same_file(self, source_path, destination_path):
285 """Checks if the source and destination are the same
287 Returns True if the destination is the same as the source, False
288 otherwise. Also returns False if the destination does not exist.
290 if not os.path.exists(destination_path):
291 return False
292 return os.path.samefile(source_path, destination_path)
295 def wait_for_all_async_commands(self):
296 for subproc in self._subcommands:
297 subproc.fork_waitfor()
298 self._subcommands = []
301 def _poll_async_commands(self):
302 still_running = []
303 for subproc in self._subcommands:
304 if subproc.poll() is None:
305 still_running.append(subproc)
306 self._subcommands = still_running
309 def _wait_for_some_async_commands(self):
310 self._poll_async_commands()
311 max_processes = scheduler_config.config.max_transfer_processes
312 while len(self._subcommands) >= max_processes:
313 time.sleep(1)
314 self._poll_async_commands()
317 def run_async_command(self, function, args):
318 subproc = subcommand.subcommand(function, args)
319 self._subcommands.append(subproc)
320 subproc.fork_start()
323 def _sync_get_file_from(self, hostname, source_path, destination_path):
324 self._ensure_directory_exists(os.path.dirname(destination_path))
325 host = create_host(hostname)
326 host.get_file(source_path, destination_path, delete_dest=True)
329 def get_file_from(self, hostname, source_path, destination_path):
330 self.run_async_command(self._sync_get_file_from,
331 (hostname, source_path, destination_path))
334 def sync_send_file_to(self, hostname, source_path, destination_path,
335 can_fail):
336 host = create_host(hostname)
337 try:
338 host.run('mkdir -p ' + os.path.dirname(destination_path))
339 host.send_file(source_path, destination_path, delete_dest=True)
340 except error.AutoservError:
341 if not can_fail:
342 raise
344 if os.path.isdir(source_path):
345 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
346 file_object = open(failed_file, 'w')
347 try:
348 file_object.write('%s:%s\n%s\n%s' %
349 (hostname, destination_path,
350 datetime.datetime.now(),
351 traceback.format_exc()))
352 finally:
353 file_object.close()
354 else:
355 copy_to = destination_path + _TRANSFER_FAILED_FILE
356 self._ensure_directory_exists(os.path.dirname(copy_to))
357 self.copy_file_or_directory(source_path, copy_to)
360 def send_file_to(self, hostname, source_path, destination_path,
361 can_fail=False):
362 self.run_async_command(self.sync_send_file_to,
363 (hostname, source_path, destination_path,
364 can_fail))
367 def _report_long_execution(self, calls, duration):
368 call_count = {}
369 for call in calls:
370 call_count.setdefault(call._method, 0)
371 call_count[call._method] += 1
372 call_summary = '\n'.join('%d %s' % (count, method)
373 for method, count in call_count.iteritems())
374 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
377 def execute_calls(self, calls):
378 results = []
379 start_time = time.time()
380 max_processes = scheduler_config.config.max_transfer_processes
381 for method_call in calls:
382 results.append(method_call.execute_on(self))
383 if len(self._subcommands) >= max_processes:
384 self._wait_for_some_async_commands()
385 self.wait_for_all_async_commands()
387 duration = time.time() - start_time
388 if duration > self._WARNING_DURATION:
389 self._report_long_execution(calls, duration)
391 warnings = self.warnings
392 self.warnings = []
393 return dict(results=results, warnings=warnings)
396 def create_host(hostname):
397 username = global_config.global_config.get_config_value(
398 'SCHEDULER', hostname + '_username', default=getpass.getuser())
399 return hosts.SSHHost(hostname, user=username)
402 def parse_input():
403 input_chunks = []
404 chunk_of_input = sys.stdin.read()
405 while chunk_of_input:
406 input_chunks.append(chunk_of_input)
407 chunk_of_input = sys.stdin.read()
408 pickled_input = ''.join(input_chunks)
410 try:
411 return pickle.loads(pickled_input)
412 except Exception, exc:
413 separator = '*' * 50
414 raise ValueError('Unpickling input failed\n'
415 'Input: %r\n'
416 'Exception from pickle:\n'
417 '%s\n%s\n%s' %
418 (pickled_input, separator, traceback.format_exc(),
419 separator))
422 def return_data(data):
423 print pickle.dumps(data)
426 def main():
427 calls = parse_input()
428 drone_utility = DroneUtility()
429 return_value = drone_utility.execute_calls(calls)
430 return_data(return_value)
433 if __name__ == '__main__':
434 main()