1 import socket
, unittest
, os
, pickle
, datetime
2 from testlib
import testutil
, PygrTestProgram
, SkipTest
3 from pygr
import seqdb
, cnestedlist
, metabase
, mapping
, logger
, sqlgraph
4 from pygr
.downloader
import SourceURL
, GenericBuilder
, uncompress_file
, \
10 from sets
import Set
as set
12 class TestBase(unittest
.TestCase
):
13 "A base class to all metabase test classes"
15 def setUp(self
, worldbasePath
=None, **kwargs
):
16 # overwrite the WORLDBASEPATH environment variable
17 self
.tempdir
= testutil
.TempDir('pygrdata')
18 if worldbasePath
is None:
19 worldbasePath
= self
.tempdir
.path
20 self
.metabase
= metabase
.MetabaseList(worldbasePath
, **kwargs
)
21 self
.pygrData
= self
.metabase
.Data
22 self
.schema
= self
.metabase
.Schema
24 self
.EQ
= self
.assertEqual
26 class Download_Test(TestBase
):
27 "Save seq db and interval to metabase shelve"
29 # tested elsewhere as well, on Linux makes gzip ask for permissions
31 def test_download(self
):
32 "Downloading of gzipped file using metabase"
34 url
= SourceURL('http://www.doe-mbi.ucla.edu/~leec/test.gz')
35 url
.__doc
__ = 'test download'
37 self
.metabase
.add_resource('Bio.Test.Download1', url
)
38 self
.metabase
.commit()
40 # performs the download
41 fpath
= self
.pygrData
.Bio
.Test
.Download1()
42 h
= testutil
.get_file_md5(fpath
)
43 self
.assertEqual(h
.hexdigest(), 'f95656496c5182d6cff9a56153c9db73')
45 def test_run_unzip(self
):
46 'test uncompress_file unzip'
47 zipfile
= testutil
.datafile('test.zip')
48 outfile
= testutil
.tempdatafile('test.out')
49 uncompress_file(zipfile
, newpath
=outfile
, singleFile
=True)
50 h
= testutil
.get_file_md5(outfile
)
51 self
.assertEqual(h
.hexdigest(), '12ada4c51ccb4c7277c16f1a3c000b90')
52 def test_do_unzip(self
):
54 zipfile
= testutil
.datafile('test.zip')
55 outfile
= testutil
.tempdatafile('test2.out')
56 do_unzip(zipfile
, outfile
, singleFile
=True)
57 h
= testutil
.get_file_md5(outfile
)
58 self
.assertEqual(h
.hexdigest(), '12ada4c51ccb4c7277c16f1a3c000b90')
59 def test_run_gunzip(self
):
60 'test uncompress_file gunzip'
61 zipfile
= testutil
.datafile('test.gz')
62 outfile
= testutil
.tempdatafile('test3.out')
63 uncompress_file(zipfile
, newpath
=outfile
)
64 h
= testutil
.get_file_md5(outfile
)
65 self
.assertEqual(h
.hexdigest(), '1db5a21a01ba465fd26c3203d6589b0e')
66 def test_do_gunzip(self
):
68 zipfile
= testutil
.datafile('test.gz')
69 outfile
= testutil
.tempdatafile('test4.out')
70 do_gunzip(zipfile
, outfile
)
71 h
= testutil
.get_file_md5(outfile
)
72 self
.assertEqual(h
.hexdigest(), '1db5a21a01ba465fd26c3203d6589b0e')
74 class GenericBuild_Test(TestBase
):
76 def test_generic_build(self
):
77 "GenericBuilder construction of the BlastDB"
79 sp_hbb1
= testutil
.datafile('sp_hbb1')
80 gb
= GenericBuilder('BlastDB', sp_hbb1
)
82 db
= pickle
.loads(s
) # force construction of the BlastDB
85 found
= [x
for x
in db
]
88 expected
= ['HBB0_PAGBO', 'HBB1_ANAMI', 'HBB1_CYGMA', 'HBB1_IGUIG',
89 'HBB1_MOUSE', 'HBB1_ONCMY', 'HBB1_PAGBO', 'HBB1_RAT',
90 'HBB1_SPHPU', 'HBB1_TAPTE', 'HBB1_TORMA', 'HBB1_TRICR',
91 'HBB1_UROHA', 'HBB1_VAREX', 'HBB1_XENBO', 'HBB1_XENLA',
92 'HBB1_XENTR', 'MYG_DIDMA', 'MYG_ELEMA', 'MYG_ERIEU',
93 'MYG_ESCGI', 'MYG_GALCR', 'PRCA_ANASP', 'PRCA_ANAVA']
96 self
.EQ(expected
, found
)
98 class DNAAnnotation_Test(TestBase
):
100 def setUp(self
,**kwargs
):
102 dnaseq
= testutil
.datafile('dnaseq.fasta')
103 tryannot
= testutil
.tempdatafile('tryannot')
105 db
= seqdb
.BlastDB(dnaseq
)
107 db
.__doc
__ = 'little dna'
109 self
.pygrData
.Bio
.Test
.dna
= db
110 annoDB
= seqdb
.AnnotationDB({1:('seq1',5,10,'fred'),
111 2:('seq1',-60,-50,'bob'),
112 3:('seq2',-20,-10,'mary')},
114 sliceAttrDict
=dict(id=0, start
=1, stop
=2,
116 annoDB
.__doc
__ = 'trivial annotation'
117 self
.pygrData
.Bio
.Test
.annoDB
= annoDB
118 nlmsa
= cnestedlist
.NLMSA(tryannot
,'w',pairwiseMode
=True,
122 nlmsa
.addAnnotation(annoDB
[annID
])
125 nlmsa
.__doc
__ = 'trivial map'
126 self
.pygrData
.Bio
.Test
.map = nlmsa
127 self
.schema
.Bio
.Test
.map = metabase
.ManyToManyRelation(db
,
128 annoDB
,bindAttrs
=('exons',))
129 self
.metabase
.commit()
130 self
.metabase
.clear_cache()
136 def test_annotation(self
):
138 db
= self
.pygrData
.Bio
.Test
.dna()
142 annoDB
= self
.pygrData
.Bio
.Test
.annoDB()
143 assert l
== [annoDB
[1], -(annoDB
[2])]
144 assert l
[0].sequence
== s1
[5:10]
145 assert l
[1].sequence
== s1
[50:60]
146 assert l
[0].name
== 'fred','test annotation attribute access'
147 assert l
[1].name
== 'bob'
149 l
= sneg
.exons
.keys()
150 assert l
== [annoDB
[2][5:], -(annoDB
[1])]
151 assert l
[0].sequence
== -(s1
[50:55])
152 assert l
[1].sequence
== -(s1
[5:10])
153 assert l
[0].name
== 'bob'
154 assert l
[1].name
== 'fred'
156 db
.close() # close SequenceFileDB
157 self
.pygrData
.Bio
.Test
.map().close() # close NLMSA
160 def populate_swissprot(pygrData
, pygrDataSchema
):
161 "Populate the current pygrData with swissprot data"
162 # build BlastDB out of the sequences
163 sp_hbb1
= testutil
.datafile('sp_hbb1')
164 sp
= seqdb
.BlastDB(sp_hbb1
)
165 sp
.__doc
__ = 'little swissprot'
166 pygrData
.Bio
.Seq
.Swissprot
.sp42
= sp
168 # also store a fragment
169 hbb
= sp
['HBB1_TORMA']
171 ival
.__doc
__ = 'fragment'
172 pygrData
.Bio
.Seq
.frag
= ival
174 # build a mapping to itself
175 m
= mapping
.Mapping(sourceDB
=sp
,targetDB
=sp
)
176 trypsin
= sp
['PRCA_ANAVA']
178 m
.__doc
__ = 'map sp to itself'
179 pygrData
.Bio
.Seq
.spmap
= m
181 # create an annotation database and bind as exons attribute
182 pygrDataSchema
.Bio
.Seq
.spmap
= metabase
.OneToManyRelation(sp
, sp
,
183 bindAttrs
=('buddy',))
184 annoDB
= seqdb
.AnnotationDB({1:('HBB1_TORMA',10,50)}, sp
,
185 sliceAttrDict
=dict(id=0, start
=1, stop
=2))
188 # generate the names where these will be stored
189 tempdir
= testutil
.TempDir('exonAnnot')
190 filename
= tempdir
.subfile('cnested')
191 nlmsa
= cnestedlist
.NLMSA(filename
, 'w', pairwiseMode
=True,
193 nlmsa
.addAnnotation(exon
)
195 annoDB
.__doc
__ = 'a little annotation db'
196 nlmsa
.__doc
__ = 'a little map'
197 pygrData
.Bio
.Annotation
.annoDB
= annoDB
198 pygrData
.Bio
.Annotation
.map = nlmsa
199 pygrDataSchema
.Bio
.Annotation
.map = \
200 metabase
.ManyToManyRelation(sp
, annoDB
, bindAttrs
=('exons',))
202 def check_match(self
):
203 frag
= self
.pygrData
.Bio
.Seq
.frag()
204 correct
= self
.pygrData
.Bio
.Seq
.Swissprot
.sp42()['HBB1_TORMA'][10:35]
205 assert frag
== correct
, 'seq ival should match'
206 assert frag
.__doc
__ == 'fragment', 'docstring should match'
207 assert str(frag
) == 'IQHIWSNVNVVEITAKALERVFYVY', 'letters should match'
208 assert len(frag
) == 25, 'length should match'
209 assert len(frag
.path
) == 142, 'length should match'
211 #store = PygrDataTextFile('results/seqdb1.pickle')
212 #saved = store['hbb1 fragment']
213 #assert frag == saved, 'seq ival should matched stored result'
216 expected
=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
217 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
219 found
= self
.metabase
.dir('Bio')
221 assert found
== expected
223 def check_dir_noargs(self
):
224 found
= self
.metabase
.dir()
226 found2
= self
.metabase
.dir('')
228 assert found
== found2
230 def check_dir_download(self
):
231 found
= self
.metabase
.dir(download
=True)
233 found2
= self
.metabase
.dir('', download
=True)
235 assert len(found
) == 0
236 assert found
== found2
238 def check_dir_re(self
):
239 expected
=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
240 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
242 found
= self
.metabase
.dir('^Bio', 'r')
244 assert found
== expected
246 expected
= ['Bio.Seq.Swissprot.sp42', 'Bio.Seq.spmap']
248 found
= self
.metabase
.dir('^Bio\..+\.sp', 'r')
250 assert found
== expected
252 def check_bind(self
):
253 sp
= self
.pygrData
.Bio
.Seq
.Swissprot
.sp42()
254 hbb
= sp
['HBB1_TORMA']
255 trypsin
= sp
['PRCA_ANAVA']
256 assert hbb
.buddy
== trypsin
, 'automatic schema attribute binding'
258 def check_bind2(self
):
259 sp
= self
.pygrData
.Bio
.Seq
.Swissprot
.sp42()
260 hbb
= sp
['HBB1_TORMA']
261 exons
= hbb
.exons
.keys()
262 assert len(exons
)==1, 'number of expected annotations'
263 annoDB
= self
.pygrData
.Bio
.Annotation
.annoDB()
265 assert exons
[0] == exon
, 'test annotation comparison'
266 assert exons
[0].pathForward
is exon
,'annotation parent match'
267 assert exons
[0].sequence
== hbb
[10:50],'annotation to sequence match'
268 onc
= sp
['HBB1_ONCMY']
270 exons
= onc
.exons
.keys()
271 raise ValueError('failed to catch query with no annotations')
275 class Sequence_Test(TestBase
):
276 def setUp(self
, *args
, **kwargs
):
277 TestBase
.setUp(self
, *args
, **kwargs
)
278 populate_swissprot(self
.pygrData
, self
.schema
)
279 self
.metabase
.commit() # finally save everything
280 self
.metabase
.clear_cache() # force all requests to reload
282 def test_match(self
):
283 "Test matching sequences"
289 check_dir_noargs(self
)
297 def test_schema(self
):
299 sp_hbb1
= testutil
.datafile('sp_hbb1')
300 sp2
= seqdb
.BlastDB(sp_hbb1
)
301 sp2
.__doc__
= 'another sp'
302 self
.pygrData
.Bio
.Seq
.sp2
= sp2
303 sp
= self
.pygrData
.Bio
.Seq
.Swissprot
.sp42()
304 m
= mapping
.Mapping(sourceDB
=sp
,targetDB
=sp2
)
305 m
.__doc
__ = 'sp -> sp2'
306 self
.pygrData
.Bio
.Seq
.testmap
= m
307 self
.schema
.Bio
.Seq
.testmap
= metabase
.OneToManyRelation(sp
, sp2
)
308 self
.metabase
.commit()
310 self
.metabase
.clear_cache()
312 sp3
= seqdb
.BlastDB(sp_hbb1
)
313 sp3
.__doc__
= 'sp number 3'
314 self
.pygrData
.Bio
.Seq
.sp3
= sp3
315 sp2
= self
.pygrData
.Bio
.Seq
.sp2()
316 m
= mapping
.Mapping(sourceDB
=sp3
,targetDB
=sp2
)
317 m
.__doc
__ = 'sp3 -> sp2'
318 self
.pygrData
.Bio
.Seq
.testmap2
= m
319 self
.schema
.Bio
.Seq
.testmap2
= metabase
.OneToManyRelation(sp3
, sp2
)
320 l
= self
.metabase
.resourceCache
.keys()
322 assert l
== ['Bio.Seq.sp2', 'Bio.Seq.sp3', 'Bio.Seq.testmap2']
323 self
.metabase
.commit()
324 g
= self
.metabase
.writer
.storage
.graph
325 expected
= set(['Bio.Annotation.annoDB',
326 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.sp2', 'Bio.Seq.sp3'])
327 found
= set(g
.keys())
328 self
.EQ(len(expected
- found
), 0)
330 class SQL_Sequence_Test(Sequence_Test
):
332 if not testutil
.mysql_enabled():
335 self
.dbtable
= testutil
.temp_table_name() # create temp db tables
336 Sequence_Test
.setUp(self
, worldbasePath
='mysql:' + self
.dbtable
,
337 mdbArgs
=dict(createLayer
='temp'))
339 testutil
.drop_tables(self
.metabase
.writer
.storage
.cursor
, self
.dbtable
)
341 class InvalidPickle_Test(TestBase
):
345 class MyUnpicklableClass(object):
347 MyUnpicklableClass
.__module
__ = '__main__'
348 self
.bad
= MyUnpicklableClass()
350 self
.good
= datetime
.datetime
.today()
352 def test_invalid_pickle(self
):
353 "Testing an invalid pickle"
354 s
= metabase
.dumps(self
.good
) # should pickle with no errors
356 s
= metabase
.dumps(self
.bad
) # should raise exception
357 msg
= 'failed to catch bad attempt to invalid module ref'
358 raise ValueError(msg
)
359 except metabase
.WorldbaseNoModuleError
:
362 class DBServerInfo_Test(TestBase
):
365 logger
.debug('accessing ensembldb.ensembl.org')
366 conn
= sqlgraph
.DBServerInfo(host
='ensembldb.ensembl.org',
367 user
='anonymous', passwd
='')
369 translationDB
= sqlgraph
.SQLTable('homo_sapiens_core_47_36i.translation',
371 exonDB
= sqlgraph
.SQLTable('homo_sapiens_core_47_36i.exon',
374 sql_statement
= '''SELECT t3.exon_id FROM
375 homo_sapiens_core_47_36i.translation AS tr,
376 homo_sapiens_core_47_36i.exon_transcript AS t1,
377 homo_sapiens_core_47_36i.exon_transcript AS t2,
378 homo_sapiens_core_47_36i.exon_transcript AS t3 WHERE tr.translation_id = %s
379 AND tr.transcript_id = t1.transcript_id AND t1.transcript_id =
380 t2.transcript_id AND t2.transcript_id = t3.transcript_id AND t1.exon_id =
381 tr.start_exon_id AND t2.exon_id = tr.end_exon_id AND t3.rank >= t1.rank AND
382 t3.rank <= t2.rank ORDER BY t3.rank
384 translationExons
= sqlgraph
.GraphView(translationDB
, exonDB
,
388 raise SkipTest('missing MySQLdb module?')
389 translationExons
.__doc
__ = 'test saving exon graph'
390 self
.pygrData
.Bio
.Ensembl
.TranslationExons
= translationExons
391 self
.metabase
.commit()
392 self
.metabase
.clear_cache()
394 def test_orderBy(self
):
395 """Test saving DBServerInfo to metabase"""
396 translationExons
= self
.pygrData
.Bio
.Ensembl
.TranslationExons()
397 translation
= translationExons
.sourceDB
[15121]
398 exons
= translationExons
[translation
] # do the query
399 result
= [e
.id for e
in exons
]
400 correct
= [95160,95020,95035,95050,95059,95069,95081,95088,95101,
402 self
.assertEqual(result
, correct
) # make sure the exact order matches
405 class XMLRPC_Test(TestBase
):
406 'create an XMLRPC server and access seqdb from it'
409 populate_swissprot(self
.pygrData
, self
.schema
) # save some data
410 self
.metabase
.commit() # finally save everything
411 self
.metabase
.clear_cache() # force all requests to reload
413 res
= [ 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap',
414 'Bio.Annotation.annoDB', 'Bio.Annotation.map' ]
415 self
.server
= testutil
.TestXMLRPCServer(res
, self
.tempdir
.path
)
416 def test_xmlrpc(self
):
418 self
.metabase
.clear_cache() # force all requests to reload
419 self
.metabase
.update("http://localhost:%s" % self
.server
.port
)
423 check_dir_noargs(self
)
424 check_dir_download(self
)
429 sb_hbb1
= testutil
.datafile('sp_hbb1')
430 sp2
= seqdb
.BlastDB(sb_hbb1
)
431 sp2
.__doc__
= 'another sp'
433 self
.pygrData
.Bio
.Seq
.sp2
= sp2
434 self
.metabase
.commit()
435 msg
= 'failed to catch bad attempt to write to XMLRPC server'
440 'halt the test XMLRPC server'
443 if __name__
== '__main__':
444 PygrTestProgram(verbosity
=2)