2 Utility functions for testing
5 import sys
, os
, shutil
, unittest
, random
, warnings
, threading
, time
, re
, glob
6 import tempfile
as tempfile_mod
9 from unittest_extensions
import SkipTest
12 from pygr
import logger
, classutil
19 # represents a test data
20 class TestData(object):
24 path_join
= pathfix
.path_join
26 # use the main logger to produce
27 info
, error
, warn
, debug
= logger
.info
, logger
.error
, logger
.warn
, logger
.debug
30 default_xmlrpc_port
= 0 # 0 -> random port; overriden by runtest.
34 def approximate_cmp(x
, y
, delta
):
35 '''expects two lists of tuples. Performs comparison as usual,
36 except that numeric types are considered equal if they differ by
38 diff
= cmp(len(x
),len(y
))
41 x
.sort() # SORT TO ENSURE IN SAME ORDER...
43 for i
in range(len(x
)):
46 diff
= cmp(len(s
),len(t
))
49 for j
in range(len(s
)):
52 if isinstance(u
,int) or isinstance(u
,float):
69 def change_pygrdatapath(*args
):
70 "Overwrites the PYGRDATAPATH enviroment variable (local copy)"
71 path
= path_join(*args
)
72 if not os
.path
.isdir(path
):
73 stop('cannot access pygrdatapath %s' % path
)
74 os
.environ
['PYGRDATAPATH'] = path
75 os
.environ
['PYGRDATADOWNLOAD'] = path
78 def generate_coverage(func
, path
, *args
, **kwds
):
80 Generates code coverage for the function
81 and places the results in the path
84 from figleaf
import annotate_html
86 if os
.path
.isdir(path
):
89 # execute the function itself
90 return_vals
= func(*args
, **kwds
)
92 logger
.info('generating coverage')
93 coverage
= figleaf
.get_data().gather_files()
94 annotate_html
.prepare_reportdir(path
)
96 # skip python modules and the test modules
97 regpatt
= lambda patt
: re
.compile(patt
, re
.IGNORECASE
)
98 patterns
= map(regpatt
, [ 'python', 'tests' ])
99 annotate_html
.report_as_html(coverage
, path
, exclude_patterns
=patterns
,
104 class TempDir(object):
106 Returns a directory in the temporary directory, either named or a
110 def __init__(self
, prefix
, path
='tempdir'):
112 self
.tempdir
= path_join( pathfix
.curr_dir
, '..', path
)
113 self
.path
= self
.get_path()
114 atexit
.register(self
.remove
)
117 "Resets the root temporary directory"
119 logger
.debug('resetting path %s' % self
.tempdir
)
120 shutil
.rmtree(self
.path
, ignore_errors
=True)
121 shutil
.rmtree(self
.tempdir
, ignore_errors
=True)
122 self
.path
= self
.get_path()
125 if not os
.path
.isdir(self
.tempdir
):
126 os
.mkdir(self
.tempdir
)
127 path
= tempfile_mod
.mkdtemp(prefix
=self
.prefix
, dir=self
.tempdir
)
130 def randname(self
, prefix
='x'):
131 "Generates a random name"
132 id = prefix
+ str(random
.randint(0, 2**31))
135 def subfile(self
, name
=None):
137 Returns a path to a file in the temporary directory,
138 either the named or a random one
140 name
= name
or self
.randname(prefix
='f')
141 return path_join(self
.path
, name
)
144 "Removes the temporary directory"
145 #shutil.rmtree(self.path, ignore_errors=True)
148 class TestXMLRPCServer(object):
150 Runs XMLRPC server in the background with a list of pygr.Data resources
151 Makes server exit when this object is released. Because we want this to
152 work even on Windows, we can't use fork, backgrounding or any other
153 quasi-sensible method for running the server process in the background.
154 So we just use a separate thread to keep our caller from blocking...
157 PYGRDATAPATH: passed to the server process command line as its PYGRDATAPATH
158 checkResources: if True, first check that all pygrDataNames are loadable.
160 def __init__(self
, pygrDataNames
, pygrDataPath
, port
=0, downloadDB
=''):
161 'starts server, returns without blocking'
162 self
.pygrDataNames
= pygrDataNames
163 self
.pygrDataPath
= pygrDataPath
164 self
.downloadDB
= downloadDB
166 global default_xmlrpc_port
168 port
= default_xmlrpc_port
171 self
.port_file
= tempdatafile('xmlrpc_port_file', False)
173 # check that all resources are available
174 ## if kwargs.get('checkResources'):
175 ## map(pygr.Data.getResource, *pygrDataNames)
177 currdir
= os
.path
.dirname(__file__
)
178 self
.server_script
= path_join(currdir
, 'pygrdata_server.py')
181 self
.thread
= threading
.Thread(target
=self
.run_server
)
185 for i
in range(10): # retry several times in case server starts slowly
186 # wait for it to start
188 # retrieve port info from file saved by server
190 ifile
= open(self
.port_file
)
192 port
= int(ifile
.read())
193 break # exit the loop
195 ifile
.close() # make sure to close file no matter what
198 assert port
, "cannot get port info from server; is server running?"
199 self
.port
= port
# use the port returned by the server
201 def run_server(self
):
202 'this method blocks, so run it in a separate thread'
203 cmdArgs
= (sys
.executable
, self
.server_script
) + tuple(sys
.argv
) \
204 + ('--port-file=' + self
.port_file
,
205 '--pygrdatapath=' + self
.pygrDataPath
,
206 '--downloadDB=' + self
.downloadDB
,
207 '--resources=' + ':'.join(self
.pygrDataNames
))
208 if self
.port
: # only add port argument if set
209 cmdArgs
+= ('--port=' + str(self
.port
),)
210 p
= classutil
.FilePopen(cmdArgs
, stdout
=classutil
.PIPE
,
211 stderr
=classutil
.PIPE
)
213 logger
.debug('Starting XML-RPC server: ')
214 logger
.debug(repr(cmdArgs
))
216 logger
.warn('XML-RPC server command failed!')
217 output
= p
.stdout
.read()
218 errout
= p
.stderr
.read()
219 logger
.debug('XML-RPC server output: %s' % output
)
220 logger
.debug('XML-RPC server error out: %s' % errout
)
224 logger
.debug('server stopped')
228 s
= xmlrpclib
.ServerProxy('http://localhost:%d' % self
.port
)
229 s
.exit_now() # TELL THE SERVER TO EXIT
233 def make_suite(tests
):
234 "Makes a test suite from a list of TestCase classes"
235 loader
= unittest
.TestLoader().loadTestsFromTestCase
236 suites
= map(loader
, tests
)
237 return unittest
.TestSuite(suites
)
241 Detects whether mysql is functional on the current system
245 except ImportError, exc
:
246 msg
= 'MySQLdb error: %s' % exc
250 from pygr
import sqlgraph
251 tempcurs
= sqlgraph
.get_name_cursor()[1]
252 # disable some MySQL specific spurious warnings, current scope only
253 warnings
.simplefilter("ignore")
254 tempcurs
.execute('create database if not exists test')
255 except Exception, exc
:
256 msg
= 'cannot operate on MySql database: %s' % exc
263 def sqlite_enabled():
265 Detects whether sqlite3 is functional on the current system
267 from pygr
.sqlgraph
import import_sqlite
269 sqlite
= import_sqlite() # from 2.5+ stdlib, or pysqlite2
270 except ImportError, exc
:
271 msg
= 'sqlite3 error: %s' % exc
277 class SQLite_Mixin(object):
278 'use this as a base for any test'
280 from pygr
.sqlgraph
import SQLiteServerInfo
281 if not sqlite_enabled():
283 self
.sqlite_file
= tempdatafile('test_sqlite.db', False)
284 self
.tearDown(False) # delete the file if it exists
285 self
.serverInfo
= SQLiteServerInfo(self
.sqlite_file
)
286 self
.sqlite_load() # load data provided by subclass method
287 def tearDown(self
, closeConnection
=True):
288 'delete the sqlite db file after (optionally) closing connection'
290 self
.serverInfo
.close() # close the database
292 os
.remove(self
.sqlite_file
)
296 def temp_table_name(dbname
='test'):
298 l
= [c
for c
in 'TeMpBiGdAcDy']
300 return dbname
+'.'+''.join(l
)
302 def drop_tables(cursor
, tablename
):
303 cursor
.execute('drop table if exists %s' % tablename
)
304 cursor
.execute('drop table if exists %s_schema' % tablename
)
306 _blast_enabled
= None # cache results of blast_enabled()
310 Detects whether the blast suite is functional on the current system
312 global _blast_enabled
313 if _blast_enabled
is not None:
314 return _blast_enabled
316 p
= classutil
.FilePopen(('blastall',), stdout
=classutil
.PIPE
)
318 p
.wait() # try to run the program
320 warn('NCBI toolkit (blastall) missing?')
321 _blast_enabled
= False
325 _blast_enabled
= True
331 DATADIR
= path_join(pathfix
.curr_dir
, '..', 'data')
332 TEMPROOT
= TempDir('tempdir')
333 TEMPDIR
= TEMPROOT
.path
335 # shortcuts for creating full paths to files in the data and temporary
337 datafile
= lambda name
: path_join(DATADIR
, name
)
339 def tempdatafile(name
, errorIfExists
=True, copyData
=False):
340 filepath
= path_join(TEMPDIR
, name
)
341 if errorIfExists
and os
.path
.exists(filepath
):
342 raise AssertionError('tempdatafile %s already exists!' % name
)
343 if copyData
: # copy data file to new location
344 shutil
.copyfile(datafile(name
), filepath
)
347 def remove_files( path
, patterns
=[ "*.seqlen" ]):
348 "Removes files matching any pattern in the list"
349 for patt
in patterns
:
350 fullpatt
= path_join(path
, patt
)
351 for name
in glob
.glob( fullpatt
):
354 def get_file_md5(fpath
):
355 ifile
= file(fpath
, 'rb')
357 h
= hashlib
.md5(ifile
.read())
362 if __name__
== '__main__':
363 t
= TempDir('tempdir')