Merge commit 'remotes/ctb/xmlrpc_patches' into tryme
[pygr.git] / tests / metabase_test.py
blob00c23bf7beeebe2b833924fec31219cd454c0272
1 import socket, unittest, os, pickle, datetime
2 from testlib import testutil, PygrTestProgram, SkipTest
3 from pygr import seqdb, cnestedlist, metabase, mapping, logger, sqlgraph
4 from pygr.downloader import SourceURL, GenericBuilder, uncompress_file, \
5 do_unzip, do_gunzip
7 try:
8 set
9 except NameError:
10 from sets import Set as set
12 class TestBase(unittest.TestCase):
13 "A base class to all metabase test classes"
15 def setUp(self, worldbasePath=None, **kwargs):
16 # overwrite the WORLDBASEPATH environment variable
17 self.tempdir = testutil.TempDir('pygrdata')
18 if worldbasePath is None:
19 worldbasePath = self.tempdir.path
20 self.metabase = metabase.MetabaseList(worldbasePath, **kwargs)
21 self.pygrData = self.metabase.Data
22 self.schema = self.metabase.Schema
23 # handy shortcuts
24 self.EQ = self.assertEqual
26 class Download_Test(TestBase):
27 "Save seq db and interval to metabase shelve"
29 # tested elsewhere as well, on Linux makes gzip ask for permissions
30 # to overwrite
31 def test_download(self):
32 "Downloading of gzipped file using metabase"
34 url = SourceURL('http://www.doe-mbi.ucla.edu/~leec/test.gz')
35 url.__doc__ = 'test download'
37 self.metabase.add_resource('Bio.Test.Download1', url)
38 self.metabase.commit()
40 # performs the download
41 fpath = self.pygrData.Bio.Test.Download1()
42 h = testutil.get_file_md5(fpath)
43 self.assertEqual(h.hexdigest(), 'f95656496c5182d6cff9a56153c9db73')
44 os.remove(fpath)
45 def test_run_unzip(self):
46 'test uncompress_file unzip'
47 zipfile = testutil.datafile('test.zip')
48 outfile = testutil.tempdatafile('test.out')
49 uncompress_file(zipfile, newpath=outfile, singleFile=True)
50 h = testutil.get_file_md5(outfile)
51 self.assertEqual(h.hexdigest(), '12ada4c51ccb4c7277c16f1a3c000b90')
52 def test_do_unzip(self):
53 'test do_unzip'
54 zipfile = testutil.datafile('test.zip')
55 outfile = testutil.tempdatafile('test2.out')
56 do_unzip(zipfile, outfile, singleFile=True)
57 h = testutil.get_file_md5(outfile)
58 self.assertEqual(h.hexdigest(), '12ada4c51ccb4c7277c16f1a3c000b90')
59 def test_run_gunzip(self):
60 'test uncompress_file gunzip'
61 zipfile = testutil.datafile('test.gz')
62 outfile = testutil.tempdatafile('test3.out')
63 uncompress_file(zipfile, newpath=outfile)
64 h = testutil.get_file_md5(outfile)
65 self.assertEqual(h.hexdigest(), '1db5a21a01ba465fd26c3203d6589b0e')
66 def test_do_gunzip(self):
67 'test do_gunzip'
68 zipfile = testutil.datafile('test.gz')
69 outfile = testutil.tempdatafile('test4.out')
70 do_gunzip(zipfile, outfile)
71 h = testutil.get_file_md5(outfile)
72 self.assertEqual(h.hexdigest(), '1db5a21a01ba465fd26c3203d6589b0e')
74 class GenericBuild_Test(TestBase):
76 def test_generic_build(self):
77 "GenericBuilder construction of the BlastDB"
79 sp_hbb1 = testutil.datafile('sp_hbb1')
80 gb = GenericBuilder('BlastDB', sp_hbb1)
81 s = pickle.dumps(gb)
82 db = pickle.loads(s) # force construction of the BlastDB
83 self.EQ(len(db), 24)
85 found = [x for x in db]
86 found.sort()
88 expected = ['HBB0_PAGBO', 'HBB1_ANAMI', 'HBB1_CYGMA', 'HBB1_IGUIG',
89 'HBB1_MOUSE', 'HBB1_ONCMY', 'HBB1_PAGBO', 'HBB1_RAT',
90 'HBB1_SPHPU', 'HBB1_TAPTE', 'HBB1_TORMA', 'HBB1_TRICR',
91 'HBB1_UROHA', 'HBB1_VAREX', 'HBB1_XENBO', 'HBB1_XENLA',
92 'HBB1_XENTR', 'MYG_DIDMA', 'MYG_ELEMA', 'MYG_ERIEU',
93 'MYG_ESCGI', 'MYG_GALCR', 'PRCA_ANASP', 'PRCA_ANAVA']
94 expected.sort()
96 self.EQ(expected, found)
98 class DNAAnnotation_Test(TestBase):
100 def setUp(self,**kwargs):
101 TestBase.setUp(self)
102 dnaseq = testutil.datafile('dnaseq.fasta')
103 tryannot = testutil.tempdatafile('tryannot')
105 db = seqdb.BlastDB(dnaseq)
106 try:
107 db.__doc__ = 'little dna'
109 self.pygrData.Bio.Test.dna = db
110 annoDB = seqdb.AnnotationDB({1:('seq1',5,10,'fred'),
111 2:('seq1',-60,-50,'bob'),
112 3:('seq2',-20,-10,'mary')},
114 sliceAttrDict=dict(id=0, start=1, stop=2,
115 name=3))
116 annoDB.__doc__ = 'trivial annotation'
117 self.pygrData.Bio.Test.annoDB = annoDB
118 nlmsa = cnestedlist.NLMSA(tryannot,'w',pairwiseMode=True,
119 bidirectional=False)
120 try:
121 for annID in annoDB:
122 nlmsa.addAnnotation(annoDB[annID])
124 nlmsa.build()
125 nlmsa.__doc__ = 'trivial map'
126 self.pygrData.Bio.Test.map = nlmsa
127 self.schema.Bio.Test.map = metabase.ManyToManyRelation(db,
128 annoDB,bindAttrs=('exons',))
129 self.metabase.commit()
130 self.metabase.clear_cache()
131 finally:
132 nlmsa.close()
133 finally:
134 db.close()
136 def test_annotation(self):
137 "Annotation test"
138 db = self.pygrData.Bio.Test.dna()
139 try:
140 s1 = db['seq1']
141 l = s1.exons.keys()
142 annoDB = self.pygrData.Bio.Test.annoDB()
143 assert l == [annoDB[1], -(annoDB[2])]
144 assert l[0].sequence == s1[5:10]
145 assert l[1].sequence == s1[50:60]
146 assert l[0].name == 'fred','test annotation attribute access'
147 assert l[1].name == 'bob'
148 sneg = -(s1[:55])
149 l = sneg.exons.keys()
150 assert l == [annoDB[2][5:], -(annoDB[1])]
151 assert l[0].sequence == -(s1[50:55])
152 assert l[1].sequence == -(s1[5:10])
153 assert l[0].name == 'bob'
154 assert l[1].name == 'fred'
155 finally:
156 db.close() # close SequenceFileDB
157 self.pygrData.Bio.Test.map().close() # close NLMSA
160 def populate_swissprot(pygrData, pygrDataSchema):
161 "Populate the current pygrData with swissprot data"
162 # build BlastDB out of the sequences
163 sp_hbb1 = testutil.datafile('sp_hbb1')
164 sp = seqdb.BlastDB(sp_hbb1)
165 sp.__doc__ = 'little swissprot'
166 pygrData.Bio.Seq.Swissprot.sp42 = sp
168 # also store a fragment
169 hbb = sp['HBB1_TORMA']
170 ival= hbb[10:35]
171 ival.__doc__ = 'fragment'
172 pygrData.Bio.Seq.frag = ival
174 # build a mapping to itself
175 m = mapping.Mapping(sourceDB=sp,targetDB=sp)
176 trypsin = sp['PRCA_ANAVA']
177 m[hbb] = trypsin
178 m.__doc__ = 'map sp to itself'
179 pygrData.Bio.Seq.spmap = m
181 # create an annotation database and bind as exons attribute
182 pygrDataSchema.Bio.Seq.spmap = metabase.OneToManyRelation(sp, sp,
183 bindAttrs=('buddy',))
184 annoDB = seqdb.AnnotationDB({1:('HBB1_TORMA',10,50)}, sp,
185 sliceAttrDict=dict(id=0, start=1, stop=2))
186 exon = annoDB[1]
188 # generate the names where these will be stored
189 tempdir = testutil.TempDir('exonAnnot')
190 filename = tempdir.subfile('cnested')
191 nlmsa = cnestedlist.NLMSA(filename, 'w', pairwiseMode=True,
192 bidirectional=False)
193 nlmsa.addAnnotation(exon)
194 nlmsa.build()
195 annoDB.__doc__ = 'a little annotation db'
196 nlmsa.__doc__ = 'a little map'
197 pygrData.Bio.Annotation.annoDB = annoDB
198 pygrData.Bio.Annotation.map = nlmsa
199 pygrDataSchema.Bio.Annotation.map = \
200 metabase.ManyToManyRelation(sp, annoDB, bindAttrs=('exons',))
202 def check_match(self):
203 frag = self.pygrData.Bio.Seq.frag()
204 correct = self.pygrData.Bio.Seq.Swissprot.sp42()['HBB1_TORMA'][10:35]
205 assert frag == correct, 'seq ival should match'
206 assert frag.__doc__ == 'fragment', 'docstring should match'
207 assert str(frag) == 'IQHIWSNVNVVEITAKALERVFYVY', 'letters should match'
208 assert len(frag) == 25, 'length should match'
209 assert len(frag.path) == 142, 'length should match'
211 #store = PygrDataTextFile('results/seqdb1.pickle')
212 #saved = store['hbb1 fragment']
213 #assert frag == saved, 'seq ival should matched stored result'
215 def check_dir(self):
216 expected=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
217 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
218 expected.sort()
219 found = self.metabase.dir('Bio')
220 found.sort()
221 assert found == expected
223 def check_dir_noargs(self):
224 found = self.metabase.dir()
225 found.sort()
226 found2 = self.metabase.dir('')
227 found2.sort()
228 assert found == found2
230 def check_dir_download(self):
231 found = self.metabase.dir(download=True)
232 found.sort()
233 found2 = self.metabase.dir('', download=True)
234 found2.sort()
235 assert len(found) == 0
236 assert found == found2
238 def check_dir_re(self):
239 expected=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
240 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
241 expected.sort()
242 found = self.metabase.dir('^Bio', 'r')
243 found.sort()
244 assert found == expected
246 expected = ['Bio.Seq.Swissprot.sp42', 'Bio.Seq.spmap']
247 expected.sort()
248 found = self.metabase.dir('^Bio\..+\.sp', 'r')
249 found.sort()
250 assert found == expected
252 def check_bind(self):
253 sp = self.pygrData.Bio.Seq.Swissprot.sp42()
254 hbb = sp['HBB1_TORMA']
255 trypsin = sp['PRCA_ANAVA']
256 assert hbb.buddy == trypsin, 'automatic schema attribute binding'
258 def check_bind2(self):
259 sp = self.pygrData.Bio.Seq.Swissprot.sp42()
260 hbb = sp['HBB1_TORMA']
261 exons = hbb.exons.keys()
262 assert len(exons)==1, 'number of expected annotations'
263 annoDB = self.pygrData.Bio.Annotation.annoDB()
264 exon = annoDB[1]
265 assert exons[0] == exon, 'test annotation comparison'
266 assert exons[0].pathForward is exon,'annotation parent match'
267 assert exons[0].sequence == hbb[10:50],'annotation to sequence match'
268 onc = sp['HBB1_ONCMY']
269 try:
270 exons = onc.exons.keys()
271 raise ValueError('failed to catch query with no annotations')
272 except KeyError:
273 pass
275 class Sequence_Test(TestBase):
276 def setUp(self, *args, **kwargs):
277 TestBase.setUp(self, *args, **kwargs)
278 populate_swissprot(self.pygrData, self.schema)
279 self.metabase.commit() # finally save everything
280 self.metabase.clear_cache() # force all requests to reload
282 def test_match(self):
283 "Test matching sequences"
284 check_match(self)
286 def test_dir(self):
287 "Test labels"
288 check_dir(self)
289 check_dir_noargs(self)
290 check_dir_re(self)
292 def test_bind(self):
293 "Test bind"
294 check_bind(self)
295 check_bind2(self)
297 def test_schema(self):
298 "Test schema"
299 sp_hbb1 = testutil.datafile('sp_hbb1')
300 sp2 = seqdb.BlastDB(sp_hbb1)
301 sp2.__doc__ = 'another sp'
302 self.pygrData.Bio.Seq.sp2 = sp2
303 sp = self.pygrData.Bio.Seq.Swissprot.sp42()
304 m = mapping.Mapping(sourceDB=sp,targetDB=sp2)
305 m.__doc__ = 'sp -> sp2'
306 self.pygrData.Bio.Seq.testmap = m
307 self.schema.Bio.Seq.testmap = metabase.OneToManyRelation(sp, sp2)
308 self.metabase.commit()
310 self.metabase.clear_cache()
312 sp3 = seqdb.BlastDB(sp_hbb1)
313 sp3.__doc__ = 'sp number 3'
314 self.pygrData.Bio.Seq.sp3 = sp3
315 sp2 = self.pygrData.Bio.Seq.sp2()
316 m = mapping.Mapping(sourceDB=sp3,targetDB=sp2)
317 m.__doc__ = 'sp3 -> sp2'
318 self.pygrData.Bio.Seq.testmap2 = m
319 self.schema.Bio.Seq.testmap2 = metabase.OneToManyRelation(sp3, sp2)
320 l = self.metabase.resourceCache.keys()
321 l.sort()
322 assert l == ['Bio.Seq.sp2', 'Bio.Seq.sp3', 'Bio.Seq.testmap2']
323 self.metabase.commit()
324 g = self.metabase.writer.storage.graph
325 expected = set(['Bio.Annotation.annoDB',
326 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.sp2', 'Bio.Seq.sp3'])
327 found = set(g.keys())
328 self.EQ(len(expected - found), 0)
330 class SQL_Sequence_Test(Sequence_Test):
331 def setUp(self):
332 if not testutil.mysql_enabled():
333 raise SkipTest
335 self.dbtable = testutil.temp_table_name() # create temp db tables
336 Sequence_Test.setUp(self, worldbasePath='mysql:' + self.dbtable,
337 mdbArgs=dict(createLayer='temp'))
338 def tearDown(self):
339 testutil.drop_tables(self.metabase.writer.storage.cursor, self.dbtable)
341 class InvalidPickle_Test(TestBase):
343 def setUp(self):
344 TestBase.setUp(self)
345 class MyUnpicklableClass(object):
346 pass
347 MyUnpicklableClass.__module__ = '__main__'
348 self.bad = MyUnpicklableClass()
350 self.good = datetime.datetime.today()
352 def test_invalid_pickle(self):
353 "Testing an invalid pickle"
354 s = metabase.dumps(self.good) # should pickle with no errors
355 try:
356 s = metabase.dumps(self.bad) # should raise exception
357 msg = 'failed to catch bad attempt to invalid module ref'
358 raise ValueError(msg)
359 except metabase.WorldbaseNoModuleError:
360 pass
362 class DBServerInfo_Test(TestBase):
363 def setUp(self):
364 TestBase.setUp(self)
365 logger.debug('accessing ensembldb.ensembl.org')
366 conn = sqlgraph.DBServerInfo(host='ensembldb.ensembl.org',
367 user='anonymous', passwd='')
368 try:
369 translationDB = sqlgraph.SQLTable('homo_sapiens_core_47_36i.translation',
370 serverInfo=conn)
371 exonDB = sqlgraph.SQLTable('homo_sapiens_core_47_36i.exon',
372 serverInfo=conn)
374 sql_statement = '''SELECT t3.exon_id FROM
375 homo_sapiens_core_47_36i.translation AS tr,
376 homo_sapiens_core_47_36i.exon_transcript AS t1,
377 homo_sapiens_core_47_36i.exon_transcript AS t2,
378 homo_sapiens_core_47_36i.exon_transcript AS t3 WHERE tr.translation_id = %s
379 AND tr.transcript_id = t1.transcript_id AND t1.transcript_id =
380 t2.transcript_id AND t2.transcript_id = t3.transcript_id AND t1.exon_id =
381 tr.start_exon_id AND t2.exon_id = tr.end_exon_id AND t3.rank >= t1.rank AND
382 t3.rank <= t2.rank ORDER BY t3.rank
384 translationExons = sqlgraph.GraphView(translationDB, exonDB,
385 sql_statement,
386 serverInfo=conn)
387 except ImportError:
388 raise SkipTest('missing MySQLdb module?')
389 translationExons.__doc__ = 'test saving exon graph'
390 self.pygrData.Bio.Ensembl.TranslationExons = translationExons
391 self.metabase.commit()
392 self.metabase.clear_cache()
394 def test_orderBy(self):
395 """Test saving DBServerInfo to metabase"""
396 translationExons = self.pygrData.Bio.Ensembl.TranslationExons()
397 translation = translationExons.sourceDB[15121]
398 exons = translationExons[translation] # do the query
399 result = [e.id for e in exons]
400 correct = [95160,95020,95035,95050,95059,95069,95081,95088,95101,
401 95110,95172]
402 self.assertEqual(result, correct) # make sure the exact order matches
405 class XMLRPC_Test(TestBase):
406 'create an XMLRPC server and access seqdb from it'
407 def setUp(self):
408 TestBase.setUp(self)
409 populate_swissprot(self.pygrData, self.schema) # save some data
410 self.metabase.commit() # finally save everything
411 self.metabase.clear_cache() # force all requests to reload
413 res = [ 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap',
414 'Bio.Annotation.annoDB', 'Bio.Annotation.map' ]
415 self.server = testutil.TestXMLRPCServer(res, self.tempdir.path)
416 def test_xmlrpc(self):
417 "Test XMLRPC"
418 self.metabase.clear_cache() # force all requests to reload
419 self.metabase.update("http://localhost:%s" % self.server.port)
421 check_match(self)
422 check_dir(self)
423 check_dir_noargs(self)
424 check_dir_download(self)
425 check_dir_re(self)
426 check_bind(self)
427 check_bind2(self)
429 sb_hbb1 = testutil.datafile('sp_hbb1')
430 sp2 = seqdb.BlastDB(sb_hbb1)
431 sp2.__doc__ = 'another sp'
432 try:
433 self.pygrData.Bio.Seq.sp2 = sp2
434 self.metabase.commit()
435 msg = 'failed to catch bad attempt to write to XMLRPC server'
436 raise KeyError(msg)
437 except ValueError:
438 pass
439 def tearDown(self):
440 'halt the test XMLRPC server'
441 self.server.close()
443 if __name__ == '__main__':
444 PygrTestProgram(verbosity=2)