testutil needed new name of get_name_cursor()
[pygr.git] / tests / testlib / testutil.py
blobdbdcb3f9e008e45a0c403cd8363c9c2dcd4306a1
1 """
2 Utility functions for testing
3 """
5 import sys, os, shutil, unittest, random, warnings, threading, time, re, glob
6 import tempfile as tempfile_mod
7 import atexit
9 from unittest_extensions import SkipTest
11 import pathfix
12 from pygr import logger, classutil
14 try:
15 import hashlib
16 except ImportError:
17 import md5 as hashlib
19 # represents a test data
20 class TestData(object):
21 pass
23 # a shortcut
24 path_join = pathfix.path_join
26 # use the main logger to produce
27 info, error, warn, debug = logger.info, logger.error, logger.warn, logger.debug
29 # global port setting
30 default_xmlrpc_port = 0 # 0 -> random port; overriden by runtest.
32 ###
34 def approximate_cmp(x, y, delta):
35 '''expects two lists of tuples. Performs comparison as usual,
36 except that numeric types are considered equal if they differ by
37 less than delta'''
38 diff = cmp(len(x),len(y))
39 if diff != 0:
40 return diff
41 x.sort() # SORT TO ENSURE IN SAME ORDER...
42 y.sort()
43 for i in range(len(x)):
44 s = x[i]
45 t = y[i]
46 diff = cmp(len(s),len(t))
47 if diff != 0:
48 return diff
49 for j in range(len(s)):
50 u = s[j]
51 v = t[j]
52 if isinstance(u,int) or isinstance(u,float):
53 diff = u - v
54 if diff < -delta:
55 return -1
56 elif diff >delta:
57 return 1
58 else:
59 diff = cmp(u,v)
60 if diff != 0:
61 return diff
62 return 0
64 def stop(text):
65 "Unrecoverable error"
66 logger.error (text)
67 sys.exit()
69 def change_pygrdatapath(*args):
70 "Overwrites the PYGRDATAPATH enviroment variable (local copy)"
71 path = path_join(*args)
72 if not os.path.isdir(path):
73 stop('cannot access pygrdatapath %s' % path)
74 os.environ['PYGRDATAPATH'] = path
75 os.environ['PYGRDATADOWNLOAD'] = path
76 import pygr.Data
78 def generate_coverage(func, path, *args, **kwds):
79 """
80 Generates code coverage for the function
81 and places the results in the path
82 """
83 import figleaf
84 from figleaf import annotate_html
86 if os.path.isdir(path):
87 shutil.rmtree(path)
89 # execute the function itself
90 return_vals = func(*args, **kwds)
92 logger.info('generating coverage')
93 coverage = figleaf.get_data().gather_files()
94 annotate_html.prepare_reportdir(path)
96 # skip python modules and the test modules
97 regpatt = lambda patt: re.compile(patt, re.IGNORECASE)
98 patterns = map(regpatt, [ 'python', 'tests' ])
99 annotate_html.report_as_html(coverage, path, exclude_patterns=patterns,
100 files_list='')
102 return return_vals
104 class TempDir(object):
106 Returns a directory in the temporary directory, either named or a
107 random one
110 def __init__(self, prefix, path='tempdir'):
111 self.prefix = prefix
112 self.tempdir = path_join( pathfix.curr_dir, '..', path )
113 self.path = self.get_path()
114 atexit.register(self.remove)
116 def reset(self):
117 "Resets the root temporary directory"
119 logger.debug('resetting path %s' % self.tempdir)
120 shutil.rmtree(self.path, ignore_errors=True)
121 shutil.rmtree(self.tempdir, ignore_errors=True)
122 self.path = self.get_path()
124 def get_path(self):
125 if not os.path.isdir(self.tempdir):
126 os.mkdir(self.tempdir)
127 path = tempfile_mod.mkdtemp(prefix=self.prefix, dir=self.tempdir)
128 return path
130 def randname(self, prefix='x'):
131 "Generates a random name"
132 id = prefix + str(random.randint(0, 2**31))
133 return id
135 def subfile(self, name=None):
137 Returns a path to a file in the temporary directory,
138 either the named or a random one
140 name = name or self.randname(prefix='f')
141 return path_join(self.path, name)
143 def remove(self):
144 "Removes the temporary directory"
145 #shutil.rmtree(self.path, ignore_errors=True)
146 pass
148 class TestXMLRPCServer(object):
150 Runs XMLRPC server in the background with a list of pygr.Data resources
151 Makes server exit when this object is released. Because we want this to
152 work even on Windows, we can't use fork, backgrounding or any other
153 quasi-sensible method for running the server process in the background.
154 So we just use a separate thread to keep our caller from blocking...
156 Optional arguments:
157 PYGRDATAPATH: passed to the server process command line as its PYGRDATAPATH
158 checkResources: if True, first check that all pygrDataNames are loadable.
160 def __init__(self, pygrDataNames, pygrDataPath, port=0, downloadDB=''):
161 'starts server, returns without blocking'
162 self.pygrDataNames = pygrDataNames
163 self.pygrDataPath = pygrDataPath
164 self.downloadDB = downloadDB
166 global default_xmlrpc_port
167 if not port:
168 port = default_xmlrpc_port
170 self.port = port
171 self.port_file = tempdatafile('xmlrpc_port_file', False)
173 # check that all resources are available
174 ## if kwargs.get('checkResources'):
175 ## map(pygr.Data.getResource, *pygrDataNames)
177 currdir = os.path.dirname(__file__)
178 self.server_script = path_join(currdir, 'pygrdata_server.py')
180 # start the thread
181 self.thread = threading.Thread(target=self.run_server)
182 self.thread.start()
184 port = None
185 for i in range(10): # retry several times in case server starts slowly
186 # wait for it to start
187 time.sleep(1)
188 # retrieve port info from file saved by server
189 try:
190 ifile = open(self.port_file)
191 try:
192 port = int(ifile.read())
193 break # exit the loop
194 finally:
195 ifile.close() # make sure to close file no matter what
196 except IOError:
197 pass
198 assert port, "cannot get port info from server; is server running?"
199 self.port = port # use the port returned by the server
201 def run_server(self):
202 'this method blocks, so run it in a separate thread'
203 cmdArgs = (sys.executable, self.server_script) + tuple(sys.argv) \
204 + ('--port-file=' + self.port_file,
205 '--pygrdatapath=' + self.pygrDataPath,
206 '--downloadDB=' + self.downloadDB,
207 '--resources=' + ':'.join(self.pygrDataNames))
208 if self.port: # only add port argument if set
209 cmdArgs += ('--port=' + str(self.port),)
210 p = classutil.FilePopen(cmdArgs, stdout=classutil.PIPE,
211 stderr=classutil.PIPE)
212 try:
213 logger.debug('Starting XML-RPC server: ')
214 logger.debug(repr(cmdArgs))
215 if p.wait():
216 logger.warn('XML-RPC server command failed!')
217 output = p.stdout.read()
218 errout = p.stderr.read()
219 logger.debug('XML-RPC server output: %s' % output)
220 logger.debug('XML-RPC server error out: %s' % errout)
221 finally:
222 p.close()
224 logger.debug('server stopped')
226 def close(self):
227 import xmlrpclib
228 s = xmlrpclib.ServerProxy('http://localhost:%d' % self.port)
229 s.exit_now() # TELL THE SERVER TO EXIT
231 self.thread.join()
233 def make_suite(tests):
234 "Makes a test suite from a list of TestCase classes"
235 loader = unittest.TestLoader().loadTestsFromTestCase
236 suites = map(loader, tests)
237 return unittest.TestSuite(suites)
239 def mysql_enabled():
241 Detects whether mysql is functional on the current system
243 try:
244 import MySQLdb
245 except ImportError, exc:
246 msg = 'MySQLdb error: %s' % exc
247 warn(msg)
248 return False
249 try:
250 from pygr import sqlgraph
251 tempcurs = sqlgraph.get_name_cursor()[1]
252 # disable some MySQL specific spurious warnings, current scope only
253 warnings.simplefilter("ignore")
254 tempcurs.execute('create database if not exists test')
255 except Exception, exc:
256 msg = 'cannot operate on MySql database: %s' % exc
257 warn(msg)
258 return False
260 return True
263 def sqlite_enabled():
265 Detects whether sqlite3 is functional on the current system
267 from pygr.sqlgraph import import_sqlite
268 try:
269 sqlite = import_sqlite() # from 2.5+ stdlib, or pysqlite2
270 except ImportError, exc:
271 msg = 'sqlite3 error: %s' % exc
272 warn(msg)
273 return False
274 return True
277 class SQLite_Mixin(object):
278 'use this as a base for any test'
279 def setUp(self):
280 from pygr.sqlgraph import SQLiteServerInfo
281 if not sqlite_enabled():
282 raise SkipTest
283 self.sqlite_file = tempdatafile('test_sqlite.db', False)
284 self.tearDown(False) # delete the file if it exists
285 self.serverInfo = SQLiteServerInfo(self.sqlite_file)
286 self.sqlite_load() # load data provided by subclass method
287 def tearDown(self, closeConnection=True):
288 'delete the sqlite db file after (optionally) closing connection'
289 if closeConnection:
290 self.serverInfo.close() # close the database
291 try:
292 os.remove(self.sqlite_file)
293 except OSError:
294 pass
296 def temp_table_name(dbname='test'):
297 import random
298 l = [c for c in 'TeMpBiGdAcDy']
299 random.shuffle(l)
300 return dbname+'.'+''.join(l)
302 def drop_tables(cursor, tablename):
303 cursor.execute('drop table if exists %s' % tablename)
304 cursor.execute('drop table if exists %s_schema' % tablename)
306 _blast_enabled = None # cache results of blast_enabled()
308 def blast_enabled():
310 Detects whether the blast suite is functional on the current system
312 global _blast_enabled
313 if _blast_enabled is not None:
314 return _blast_enabled
316 p = classutil.FilePopen(('blastall',), stdout=classutil.PIPE)
317 try:
318 p.wait() # try to run the program
319 except OSError:
320 warn('NCBI toolkit (blastall) missing?')
321 _blast_enabled = False
322 return False
323 p.close()
325 _blast_enabled = True
326 return True
331 DATADIR = path_join(pathfix.curr_dir, '..', 'data')
332 TEMPROOT = TempDir('tempdir')
333 TEMPDIR = TEMPROOT.path
335 # shortcuts for creating full paths to files in the data and temporary
336 # directories
337 datafile = lambda name: path_join(DATADIR, name)
339 def tempdatafile(name, errorIfExists=True, copyData=False):
340 filepath = path_join(TEMPDIR, name)
341 if errorIfExists and os.path.exists(filepath):
342 raise AssertionError('tempdatafile %s already exists!' % name)
343 if copyData: # copy data file to new location
344 shutil.copyfile(datafile(name), filepath)
345 return filepath
347 def remove_files( path, patterns=[ "*.seqlen" ]):
348 "Removes files matching any pattern in the list"
349 for patt in patterns:
350 fullpatt = path_join(path, patt)
351 for name in glob.glob( fullpatt ):
352 os.remove(name)
354 def get_file_md5(fpath):
355 ifile = file(fpath, 'rb')
356 try:
357 h = hashlib.md5(ifile.read())
358 finally:
359 ifile.close()
360 return h
362 if __name__ == '__main__':
363 t = TempDir('tempdir')
364 t.reset()
366 #TestXMLRPCServer()