3 import pickle
, subprocess
, os
, shutil
, socket
, sys
, time
, signal
, getpass
4 import datetime
, traceback
, tempfile
, itertools
, logging
6 from autotest_lib
.client
.common_lib
import utils
, global_config
, error
7 from autotest_lib
.server
import hosts
, subcommand
8 from autotest_lib
.scheduler
import email_manager
, scheduler_config
10 # An environment variable we add to the environment to enable us to
11 # distinguish processes we started from those that were started by
12 # something else during recovery. Name credit goes to showard. ;)
13 DARK_MARK_ENVIRONMENT_VAR
= 'AUTOTEST_SCHEDULER_DARK_MARK'
15 _TEMPORARY_DIRECTORY
= 'drone_tmp'
16 _TRANSFER_FAILED_FILE
= '.transfer_failed'
19 class _MethodCall(object):
20 def __init__(self
, method
, args
, kwargs
):
26 def execute_on(self
, drone_utility
):
27 method
= getattr(drone_utility
, self
._method
)
28 return method(*self
._args
, **self
._kwargs
)
32 args
= ', '.join(repr(arg
) for arg
in self
._args
)
33 kwargs
= ', '.join('%s=%r' % (key
, value
) for key
, value
in
34 self
._kwargs
.iteritems())
35 full_args
= ', '.join(item
for item
in (args
, kwargs
) if item
)
36 return '%s(%s)' % (self
._method
, full_args
)
39 def call(method
, *args
, **kwargs
):
40 return _MethodCall(method
, args
, kwargs
)
43 class DroneUtility(object):
45 This class executes actual OS calls on the drone machine.
47 All paths going into and out of this class are absolute.
49 _WARNING_DURATION
= 60
52 # Tattoo ourselves so that all of our spawn bears our mark.
53 os
.putenv(DARK_MARK_ENVIRONMENT_VAR
, str(os
.getpid()))
56 self
._subcommands
= []
59 def initialize(self
, results_dir
):
60 temporary_directory
= os
.path
.join(results_dir
, _TEMPORARY_DIRECTORY
)
61 if os
.path
.exists(temporary_directory
):
62 shutil
.rmtree(temporary_directory
)
63 self
._ensure
_directory
_exists
(temporary_directory
)
64 build_extern_cmd
= os
.path
.join(results_dir
,
65 '../utils/build_externals.py')
66 utils
.run(build_extern_cmd
)
69 def _warn(self
, warning
):
70 self
.warnings
.append(warning
)
74 def _check_pid_for_dark_mark(pid
, open=open):
76 env_file
= open('/proc/%s/environ' % pid
, 'rb')
77 except EnvironmentError:
80 env_data
= env_file
.read()
83 return DARK_MARK_ENVIRONMENT_VAR
in env_data
86 _PS_ARGS
= ('pid', 'pgid', 'ppid', 'comm', 'args')
90 def _get_process_info(cls
):
92 @returns A generator of dicts with cls._PS_ARGS as keys and
93 string values each representing a running process.
95 ps_proc
= subprocess
.Popen(
96 ['/bin/ps', 'x', '-o', ','.join(cls
._PS
_ARGS
)],
97 stdout
=subprocess
.PIPE
)
98 ps_output
= ps_proc
.communicate()[0]
100 # split each line into the columns output by ps
101 split_lines
= [line
.split(None, 4) for line
in ps_output
.splitlines()]
102 return (dict(itertools
.izip(cls
._PS
_ARGS
, line_components
))
103 for line_components
in split_lines
)
106 def _refresh_processes(self
, command_name
, open=open,
107 site_check_parse
=None):
108 # The open argument is used for test injection.
109 check_mark
= global_config
.global_config
.get_config_value(
110 'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
112 for info
in self
._get
_process
_info
():
113 is_parse
= (site_check_parse
and site_check_parse(info
))
114 if info
['comm'] == command_name
or is_parse
:
115 if (check_mark
and not
116 self
._check
_pid
_for
_dark
_mark
(info
['pid'], open=open)):
117 self
._warn
('%(comm)s process pid %(pid)s has no '
118 'dark mark; ignoring.' % info
)
120 processes
.append(info
)
125 def _read_pidfiles(self
, pidfile_paths
):
127 for pidfile_path
in pidfile_paths
:
128 if not os
.path
.exists(pidfile_path
):
131 file_object
= open(pidfile_path
, 'r')
132 pidfiles
[pidfile_path
] = file_object
.read()
139 def refresh(self
, pidfile_paths
):
141 pidfile_paths should be a list of paths to check for pidfiles.
143 Returns a dict containing:
144 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
146 * autoserv_processes: list of dicts corresponding to running autoserv
147 processes. each dict contain pid, pgid, ppid, comm, and args (see
148 "man ps" for details).
149 * parse_processes: likewise, for parse processes.
150 * pidfiles_second_read: same info as pidfiles, but gathered after the
151 processes are scanned.
153 site_check_parse
= utils
.import_site_function(
154 __file__
, 'autotest_lib.scheduler.site_drone_utility',
155 'check_parse', lambda x
: False)
157 'pidfiles' : self
._read
_pidfiles
(pidfile_paths
),
158 'autoserv_processes' : self
._refresh
_processes
('autoserv'),
159 'parse_processes' : self
._refresh
_processes
(
160 'parse', site_check_parse
=site_check_parse
),
161 'pidfiles_second_read' : self
._read
_pidfiles
(pidfile_paths
),
166 def kill_process(self
, process
):
167 signal_queue
= (signal
.SIGCONT
, signal
.SIGTERM
, signal
.SIGKILL
)
168 utils
.nuke_pid(process
.pid
, signal_queue
=signal_queue
)
171 def _convert_old_host_log(self
, log_path
):
173 For backwards compatibility only. This can safely be removed in the
176 The scheduler used to create files at results/hosts/<hostname>, and
177 append all host logs to that file. Now, it creates directories at
178 results/hosts/<hostname>, and places individual timestamped log files
181 This can be a problem the first time the scheduler runs after upgrading.
182 To work around that, we'll look for a file at the path where the
183 directory should be, and if we find one, we'll automatically convert it
184 to a directory containing the old logfile.
186 # move the file out of the way
187 temp_dir
= tempfile
.mkdtemp(suffix
='.convert_host_log')
188 base_name
= os
.path
.basename(log_path
)
189 temp_path
= os
.path
.join(temp_dir
, base_name
)
190 os
.rename(log_path
, temp_path
)
194 # and move it into the new directory
195 os
.rename(temp_path
, os
.path
.join(log_path
, 'old_log'))
199 def _ensure_directory_exists(self
, path
):
200 if os
.path
.isdir(path
):
203 if os
.path
.exists(path
):
204 # path exists already, but as a file, not a directory
205 if '/hosts/' in path
:
206 self
._convert
_old
_host
_log
(path
)
209 raise IOError('Path %s exists as a file, not a directory')
214 def execute_command(self
, command
, working_directory
, log_file
,
218 self
._ensure
_directory
_exists
(os
.path
.dirname(log_file
))
220 out_file
= open(log_file
, 'a')
221 separator
= ('*' * 80) + '\n'
222 out_file
.write('\n' + separator
)
223 out_file
.write("%s> %s\n" % (time
.strftime("%X %x"), command
))
224 out_file
.write(separator
)
225 except (OSError, IOError):
226 email_manager
.manager
.log_stacktrace(
227 'Error opening log file %s' % log_file
)
230 out_file
= open('/dev/null', 'w')
232 in_devnull
= open('/dev/null', 'r')
234 self
._ensure
_directory
_exists
(working_directory
)
235 pidfile_path
= os
.path
.join(working_directory
, pidfile_name
)
236 if os
.path
.exists(pidfile_path
):
237 self
._warn
('Pidfile %s already exists' % pidfile_path
)
238 os
.remove(pidfile_path
)
240 subprocess
.Popen(command
, stdout
=out_file
, stderr
=subprocess
.STDOUT
,
246 def write_to_file(self
, file_path
, contents
):
247 self
._ensure
_directory
_exists
(os
.path
.dirname(file_path
))
249 file_object
= open(file_path
, 'a')
250 file_object
.write(contents
)
253 self
._warn
('Error write to file %s: %s' % (file_path
, exc
))
256 def copy_file_or_directory(self
, source_path
, destination_path
):
258 This interface is designed to match server.hosts.abstract_ssh.get_file
259 (and send_file). That is, if the source_path ends with a slash, the
260 contents of the directory are copied; otherwise, the directory iself is
263 if self
._same
_file
(source_path
, destination_path
):
265 self
._ensure
_directory
_exists
(os
.path
.dirname(destination_path
))
266 if source_path
.endswith('/'):
267 # copying a directory's contents to another directory
268 assert os
.path
.isdir(source_path
)
269 assert os
.path
.isdir(destination_path
)
270 for filename
in os
.listdir(source_path
):
271 self
.copy_file_or_directory(
272 os
.path
.join(source_path
, filename
),
273 os
.path
.join(destination_path
, filename
))
274 elif os
.path
.isdir(source_path
):
275 shutil
.copytree(source_path
, destination_path
, symlinks
=True)
276 elif os
.path
.islink(source_path
):
277 # copied from shutil.copytree()
278 link_to
= os
.readlink(source_path
)
279 os
.symlink(link_to
, destination_path
)
281 shutil
.copy(source_path
, destination_path
)
284 def _same_file(self
, source_path
, destination_path
):
285 """Checks if the source and destination are the same
287 Returns True if the destination is the same as the source, False
288 otherwise. Also returns False if the destination does not exist.
290 if not os
.path
.exists(destination_path
):
292 return os
.path
.samefile(source_path
, destination_path
)
295 def wait_for_all_async_commands(self
):
296 for subproc
in self
._subcommands
:
297 subproc
.fork_waitfor()
298 self
._subcommands
= []
301 def _poll_async_commands(self
):
303 for subproc
in self
._subcommands
:
304 if subproc
.poll() is None:
305 still_running
.append(subproc
)
306 self
._subcommands
= still_running
309 def _wait_for_some_async_commands(self
):
310 self
._poll
_async
_commands
()
311 max_processes
= scheduler_config
.config
.max_transfer_processes
312 while len(self
._subcommands
) >= max_processes
:
314 self
._poll
_async
_commands
()
317 def run_async_command(self
, function
, args
):
318 subproc
= subcommand
.subcommand(function
, args
)
319 self
._subcommands
.append(subproc
)
323 def _sync_get_file_from(self
, hostname
, source_path
, destination_path
):
324 self
._ensure
_directory
_exists
(os
.path
.dirname(destination_path
))
325 host
= create_host(hostname
)
326 host
.get_file(source_path
, destination_path
, delete_dest
=True)
329 def get_file_from(self
, hostname
, source_path
, destination_path
):
330 self
.run_async_command(self
._sync
_get
_file
_from
,
331 (hostname
, source_path
, destination_path
))
334 def sync_send_file_to(self
, hostname
, source_path
, destination_path
,
336 host
= create_host(hostname
)
338 host
.run('mkdir -p ' + os
.path
.dirname(destination_path
))
339 host
.send_file(source_path
, destination_path
, delete_dest
=True)
340 except error
.AutoservError
:
344 if os
.path
.isdir(source_path
):
345 failed_file
= os
.path
.join(source_path
, _TRANSFER_FAILED_FILE
)
346 file_object
= open(failed_file
, 'w')
348 file_object
.write('%s:%s\n%s\n%s' %
349 (hostname
, destination_path
,
350 datetime
.datetime
.now(),
351 traceback
.format_exc()))
355 copy_to
= destination_path
+ _TRANSFER_FAILED_FILE
356 self
._ensure
_directory
_exists
(os
.path
.dirname(copy_to
))
357 self
.copy_file_or_directory(source_path
, copy_to
)
360 def send_file_to(self
, hostname
, source_path
, destination_path
,
362 self
.run_async_command(self
.sync_send_file_to
,
363 (hostname
, source_path
, destination_path
,
367 def _report_long_execution(self
, calls
, duration
):
370 call_count
.setdefault(call
._method
, 0)
371 call_count
[call
._method
] += 1
372 call_summary
= '\n'.join('%d %s' % (count
, method
)
373 for method
, count
in call_count
.iteritems())
374 self
._warn
('Execution took %f sec\n%s' % (duration
, call_summary
))
377 def execute_calls(self
, calls
):
379 start_time
= time
.time()
380 max_processes
= scheduler_config
.config
.max_transfer_processes
381 for method_call
in calls
:
382 results
.append(method_call
.execute_on(self
))
383 if len(self
._subcommands
) >= max_processes
:
384 self
._wait
_for
_some
_async
_commands
()
385 self
.wait_for_all_async_commands()
387 duration
= time
.time() - start_time
388 if duration
> self
._WARNING
_DURATION
:
389 self
._report
_long
_execution
(calls
, duration
)
391 warnings
= self
.warnings
393 return dict(results
=results
, warnings
=warnings
)
396 def create_host(hostname
):
397 username
= global_config
.global_config
.get_config_value(
398 'SCHEDULER', hostname
+ '_username', default
=getpass
.getuser())
399 return hosts
.SSHHost(hostname
, user
=username
)
404 chunk_of_input
= sys
.stdin
.read()
405 while chunk_of_input
:
406 input_chunks
.append(chunk_of_input
)
407 chunk_of_input
= sys
.stdin
.read()
408 pickled_input
= ''.join(input_chunks
)
411 return pickle
.loads(pickled_input
)
412 except Exception, exc
:
414 raise ValueError('Unpickling input failed\n'
416 'Exception from pickle:\n'
418 (pickled_input
, separator
, traceback
.format_exc(),
422 def return_data(data
):
423 print pickle
.dumps(data
)
427 calls
= parse_input()
428 drone_utility
= DroneUtility()
429 return_value
= drone_utility
.execute_calls(calls
)
430 return_data(return_value
)
433 if __name__
== '__main__':