Detect leaked gsh processes
[polysh.git] / tests / gsh_tests.py
blob87c55eac871ebac9e7487e3e920b0a3a5eaed7c8
1 #!/usr/bin/env python
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Library General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 # See the COPYING file for license information.
19 # Copyright (c) 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
21 import errno
22 import os
23 import unittest
24 import shutil
25 import sys
26 import optparse
27 import pexpect
28 import subprocess
29 import traceback
31 import coverage
33 PID_DIR = '.gsh_pids'
34 TESTS = unittest.TestSuite()
36 def iter_over_all_tests():
37 py_files = [p for p in os.listdir('tests') if p.endswith('.py')]
38 tests = list(set([p[:p.index('.')] for p in py_files]))
39 for name in tests:
40 module = getattr(__import__('tests.' + name), name)
41 for module_content in dir(module):
42 candidate = getattr(module, module_content)
43 if not isinstance(candidate, type):
44 continue
45 if not issubclass(candidate, unittest.TestCase):
46 continue
47 suite = unittest.defaultTestLoader.loadTestsFromTestCase(candidate)
48 for test_method in suite:
49 yield test_method
51 def import_all_tests():
52 for test in iter_over_all_tests():
53 TESTS.addTest(test)
55 def import_specified_tests(names):
56 for test in iter_over_all_tests():
57 test_name = test.id().split('.')[-1]
58 if test_name in names:
59 names.remove(test_name)
60 TESTS.addTest(test)
61 if names:
62 print 'Cannot find tests:', names
63 sys.exit(1)
65 def parse_cmdline():
66 usage='Usage: %s [OPTIONS...] [TESTS...]' % sys.argv[0]
67 parser = optparse.OptionParser(usage=usage)
68 parser.add_option('--coverage', action='store_true', dest='coverage',
69 default=False, help='include coverage tests')
70 parser.add_option('--log', type='str', dest='log',
71 help='log all pexpect I/O and gsh debug info')
72 options, args = parser.parse_args()
73 return options, args
75 def remove_coverage_files():
76 for filename in os.listdir('.'):
77 if filename.startswith('.coverage'):
78 os.remove(filename)
80 def end_coverage():
81 coverage.the_coverage.start()
82 coverage.the_coverage.collect()
83 coverage.the_coverage.stop()
84 modules = [p[:-3] for p in os.listdir('../gsh') if p.endswith('.py')]
85 coverage.report(['../gsh/%s.py' % (m) for m in modules])
86 remove_coverage_files()
87 # Prevent the atexit.register(the_coverage.save) from recreating the files
88 coverage.the_coverage.usecache = coverage.the_coverage.cache = None
90 def cleanup_gsh_pids():
91 for pid in map(int, os.listdir(PID_DIR)):
92 try:
93 os.kill(pid, 0)
94 except OSError:
95 # Good, process already killed
96 pass
97 else:
98 print '--------'
99 print 'gsh process %d leaked, launched at:' % pid
100 stack_path = '%s/%d' % (PID_DIR, pid)
101 subprocess.Popen(args=['cat', stack_path]).communicate()
102 shutil.rmtree(PID_DIR)
104 def main():
105 options, args = parse_cmdline()
106 if options.coverage:
107 remove_coverage_files()
108 try:
109 shutil.rmtree(PID_DIR)
110 except OSError, e:
111 assert e.errno == errno.ENOENT
112 os.mkdir(PID_DIR)
113 if args:
114 import_specified_tests(args)
115 else:
116 import_all_tests()
117 try:
118 unittest.main(argv=[sys.argv[0], '-v'], defaultTest='TESTS')
119 finally:
120 try:
121 cleanup_gsh_pids()
122 finally:
123 if options.coverage:
124 end_coverage()
127 class non_interactive_spawn(pexpect.spawn):
128 def __init__(self, argv, input_data, *args, **kwargs):
129 pexpect.spawn.__init__(self, None, *args, **kwargs)
130 self.use_native_pty_fork = False
131 self.argv = argv
132 self.input_data = input_data
133 self.command = argv[0]
134 self.args = argv[1:]
135 self._spawn(self.command, self.args)
137 def _spawn__fork_pty(self):
138 process = subprocess.Popen(self.argv, stdin=subprocess.PIPE,
139 stdout=subprocess.PIPE,
140 stderr=subprocess.STDOUT)
142 fd = process.stdin.fileno()
143 while self.input_data:
144 written = os.write(fd, self.input_data)
145 self.input_data = self.input_data[written:]
146 process.stdin.close()
147 # process will be garbage collected and process.stdout closed, so
148 # use a dupped fd.
149 return process.pid, os.dup(process.stdout.fileno())
151 def launch_gsh(args, input_data=None):
152 args = ['../gsh.py'] + args
153 options, unused_args = parse_cmdline()
154 if options.coverage:
155 args = ['./coverage.py', '-x', '-p'] + args
156 if options.log:
157 logfile = open(options.log, 'a', 0644)
158 args += ['--debug']
159 print >> logfile, 'Launching:', str(args)
160 else:
161 logfile = None
163 if input_data is None:
164 child = pexpect.spawn(args[0], args=args[1:], logfile=logfile)
165 else:
166 child = non_interactive_spawn(args, input_data, logfile=logfile)
167 traceback.print_stack(file=file('%s/%d' % (PID_DIR, child.pid), 'w'))
168 return child
170 if __name__ == '__main__':
171 main()