1 import socket
, unittest
, os
, md5
, pickle
, datetime
4 from testlib
import testutil
, SkipTest
, PygrTestProgram
5 from pygr
import seqdb
, cnestedlist
, metabase
, mapping
6 from pygr
import worldbase
7 from pygr
.downloader
import SourceURL
, GenericBuilder
12 from sets
import Set
as set
14 class TestBase(unittest
.TestCase
):
15 "A base class to all worldbase test classes"
17 def setUp(self
, worldbasePath
=None, **kwargs
):
18 # overwrite the WORLDBASEPATH environment variable
19 self
.tempdir
= testutil
.TempDir('pygrdata')
20 if worldbasePath
is None:
21 worldbasePath
= self
.tempdir
.path
22 worldbase
.update(worldbasePath
, **kwargs
)
24 self
.EQ
= self
.assertEqual
26 class Download_Test(TestBase
):
27 "Save seq db and interval to worldbase shelve"
29 # tested elsewhere as well, on Linux makes gzip ask for permissions
31 def test_download(self
):
32 "Downloading of gzipped file using worldbase"
34 url
= SourceURL('http://www.doe-mbi.ucla.edu/~leec/test.gz')
35 url
.__doc
__ = 'test download'
37 worldbase
.add_resource('Bio.Test.Download1', url
)
40 # performs the download
41 fpath
= worldbase
.Bio
.Test
.Download1()
42 h
= testutil
.get_file_md5(fpath
)
43 self
.assertEqual(h
.hexdigest(), 'f95656496c5182d6cff9a56153c9db73')
46 class GenericBuild_Test(TestBase
):
48 def test_generic_build(self
):
49 "GenericBuilder construction of the BlastDB"
51 sp_hbb1
= testutil
.datafile('sp_hbb1')
52 gb
= GenericBuilder('BlastDB', sp_hbb1
)
54 db
= pickle
.loads(s
) # force construction of the BlastDB
57 found
= [x
for x
in db
]
60 expected
= ['HBB0_PAGBO', 'HBB1_ANAMI', 'HBB1_CYGMA', 'HBB1_IGUIG',
61 'HBB1_MOUSE', 'HBB1_ONCMY', 'HBB1_PAGBO', 'HBB1_RAT',
62 'HBB1_SPHPU', 'HBB1_TAPTE', 'HBB1_TORMA', 'HBB1_TRICR',
63 'HBB1_UROHA', 'HBB1_VAREX', 'HBB1_XENBO', 'HBB1_XENLA',
64 'HBB1_XENTR', 'MYG_DIDMA', 'MYG_ELEMA', 'MYG_ERIEU',
65 'MYG_ESCGI', 'MYG_GALCR', 'PRCA_ANASP', 'PRCA_ANAVA']
68 self
.EQ(expected
, found
)
70 class DNAAnnotation_Test(TestBase
):
72 def setUp(self
,**kwargs
):
74 dnaseq
= testutil
.datafile('dnaseq.fasta')
75 tryannot
= testutil
.tempdatafile('tryannot')
77 db
= seqdb
.BlastDB(dnaseq
)
79 db
.__doc
__ = 'little dna'
81 worldbase
.Bio
.Test
.dna
= db
82 annoDB
= seqdb
.AnnotationDB({1:('seq1',5,10,'fred'),
83 2:('seq1',-60,-50,'bob'),
84 3:('seq2',-20,-10,'mary')},
86 sliceAttrDict
=dict(id=0, start
=1, stop
=2,
88 annoDB
.__doc
__ = 'trivial annotation'
89 worldbase
.Bio
.Test
.annoDB
= annoDB
90 nlmsa
= cnestedlist
.NLMSA(tryannot
,'w',pairwiseMode
=True,
94 nlmsa
.addAnnotation(annoDB
[annID
])
97 nlmsa
.__doc
__ = 'trivial map'
98 worldbase
.Bio
.Test
.map = nlmsa
99 worldbase
.schema
.Bio
.Test
.map = metabase
.ManyToManyRelation(db
,
100 annoDB
,bindAttrs
=('exons',))
102 worldbase
.clear_cache()
108 def test_annotation(self
):
110 db
= worldbase
.Bio
.Test
.dna()
114 annoDB
= worldbase
.Bio
.Test
.annoDB()
115 assert l
== [annoDB
[1], -(annoDB
[2])]
116 assert l
[0].sequence
== s1
[5:10]
117 assert l
[1].sequence
== s1
[50:60]
118 assert l
[0].name
== 'fred','test annotation attribute access'
119 assert l
[1].name
== 'bob'
121 l
= sneg
.exons
.keys()
122 assert l
== [annoDB
[2][5:], -(annoDB
[1])]
123 assert l
[0].sequence
== -(s1
[50:55])
124 assert l
[1].sequence
== -(s1
[5:10])
125 assert l
[0].name
== 'bob'
126 assert l
[1].name
== 'fred'
128 db
.close() # close SequenceFileDB
129 worldbase
.Bio
.Test
.map().close() # close NLMSA
131 def populate_swissprot():
132 "Populate the current worldbase with swissprot data"
133 # build BlastDB out of the sequences
134 sp_hbb1
= testutil
.datafile('sp_hbb1')
135 sp
= seqdb
.BlastDB(sp_hbb1
)
136 sp
.__doc
__ = 'little swissprot'
137 worldbase
.Bio
.Seq
.Swissprot
.sp42
= sp
139 # also store a fragment
140 hbb
= sp
['HBB1_TORMA']
142 ival
.__doc
__ = 'fragment'
143 worldbase
.Bio
.Seq
.frag
= ival
145 # build a mapping to itself
146 m
= mapping
.Mapping(sourceDB
=sp
,targetDB
=sp
)
147 trypsin
= sp
['PRCA_ANAVA']
149 m
.__doc
__ = 'map sp to itself'
150 worldbase
.Bio
.Seq
.spmap
= m
152 # create an annotation database and bind as exons attribute
153 worldbase
.schema
.Bio
.Seq
.spmap
= metabase
.OneToManyRelation(sp
, sp
,
154 bindAttrs
=('buddy',))
155 annoDB
= seqdb
.AnnotationDB({1:('HBB1_TORMA',10,50)}, sp
,
156 sliceAttrDict
=dict(id=0, start
=1, stop
=2))
159 # generate the names where these will be stored
160 tempdir
= testutil
.TempDir('exonAnnot')
161 filename
= tempdir
.subfile('cnested')
162 nlmsa
= cnestedlist
.NLMSA(filename
, 'w', pairwiseMode
=True,
164 nlmsa
.addAnnotation(exon
)
166 annoDB
.__doc
__ = 'a little annotation db'
167 nlmsa
.__doc
__ = 'a little map'
168 worldbase
.Bio
.Annotation
.annoDB
= annoDB
169 worldbase
.Bio
.Annotation
.map = nlmsa
170 worldbase
.schema
.Bio
.Annotation
.map = \
171 metabase
.ManyToManyRelation(sp
, annoDB
, bindAttrs
=('exons',))
173 def check_match(self
):
174 frag
= worldbase
.Bio
.Seq
.frag()
175 correct
= worldbase
.Bio
.Seq
.Swissprot
.sp42()['HBB1_TORMA'][10:35]
176 assert frag
== correct
, 'seq ival should match'
177 assert frag
.__doc
__ == 'fragment', 'docstring should match'
178 assert str(frag
) == 'IQHIWSNVNVVEITAKALERVFYVY', 'letters should match'
179 assert len(frag
) == 25, 'length should match'
180 assert len(frag
.path
) == 142, 'length should match'
182 #store = PygrDataTextFile('results/seqdb1.pickle')
183 #saved = store['hbb1 fragment']
184 #assert frag == saved, 'seq ival should matched stored result'
187 expected
=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
188 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
190 found
= worldbase
.dir('Bio')
192 assert found
== expected
194 def check_dir_noargs(self
):
195 found
= worldbase
.dir()
197 found2
= worldbase
.dir('')
199 assert found
== found2
201 def check_dir_download(self
):
202 found
= worldbase
.dir(download
=True)
204 found2
= worldbase
.dir('', download
=True)
206 assert len(found
) == 0
207 assert found
== found2
209 def check_dir_re(self
):
210 expected
=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
211 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
213 found
= worldbase
.dir('^Bio', 'r')
215 assert found
== expected
217 expected
= ['Bio.Seq.Swissprot.sp42', 'Bio.Seq.spmap']
219 found
= worldbase
.dir('^Bio\..+\.sp', 'r')
221 assert found
== expected
223 def check_bind(self
):
224 sp
= worldbase
.Bio
.Seq
.Swissprot
.sp42()
225 hbb
= sp
['HBB1_TORMA']
226 trypsin
= sp
['PRCA_ANAVA']
227 assert hbb
.buddy
== trypsin
, 'automatic schema attribute binding'
229 def check_bind2(self
):
230 sp
= worldbase
.Bio
.Seq
.Swissprot
.sp42()
231 hbb
= sp
['HBB1_TORMA']
232 exons
= hbb
.exons
.keys()
233 assert len(exons
)==1, 'number of expected annotations'
234 annoDB
= worldbase
.Bio
.Annotation
.annoDB()
236 assert exons
[0] == exon
, 'test annotation comparison'
237 assert exons
[0].pathForward
is exon
,'annotation parent match'
238 assert exons
[0].sequence
== hbb
[10:50],'annotation to sequence match'
239 onc
= sp
['HBB1_ONCMY']
241 exons
= onc
.exons
.keys()
242 raise ValueError('failed to catch query with no annotations')
246 class Sequence_Test(TestBase
):
247 def setUp(self
, *args
, **kwargs
):
248 TestBase
.setUp(self
, *args
, **kwargs
)
250 worldbase
.commit() # finally save everything
251 worldbase
.clear_cache() # force all requests to reload
253 def test_match(self
):
254 "Test matching sequences"
260 check_dir_noargs(self
)
268 def test_schema(self
):
270 sp_hbb1
= testutil
.datafile('sp_hbb1')
271 sp2
= seqdb
.BlastDB(sp_hbb1
)
272 sp2
.__doc__
= 'another sp'
273 worldbase
.Bio
.Seq
.sp2
= sp2
274 sp
= worldbase
.Bio
.Seq
.Swissprot
.sp42()
275 m
= mapping
.Mapping(sourceDB
=sp
,targetDB
=sp2
)
276 m
.__doc
__ = 'sp -> sp2'
277 worldbase
.Bio
.Seq
.testmap
= m
278 worldbase
.schema
.Bio
.Seq
.testmap
= metabase
.OneToManyRelation(sp
, sp2
)
281 worldbase
.clear_cache()
283 sp3
= seqdb
.BlastDB(sp_hbb1
)
284 sp3
.__doc__
= 'sp number 3'
285 worldbase
.Bio
.Seq
.sp3
= sp3
286 sp2
= worldbase
.Bio
.Seq
.sp2()
287 m
= mapping
.Mapping(sourceDB
=sp3
,targetDB
=sp2
)
288 m
.__doc
__ = 'sp3 -> sp2'
289 worldbase
.Bio
.Seq
.testmap2
= m
290 worldbase
.schema
.Bio
.Seq
.testmap2
= metabase
.OneToManyRelation(sp3
, sp2
)
291 l
= worldbase
._mdb
.resourceCache
.keys()
293 assert l
== ['Bio.Seq.sp2', 'Bio.Seq.sp3', 'Bio.Seq.testmap2']
295 g
= worldbase
._mdb
.writer
.storage
.graph
296 expected
= set(['Bio.Annotation.annoDB',
297 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.sp2', 'Bio.Seq.sp3'])
298 found
= set(g
.keys())
299 self
.EQ(len(expected
- found
), 0)
301 class SQL_Sequence_Test(Sequence_Test
):
303 if not testutil
.mysql_enabled():
304 raise SkipTest
, "no MySQL installed"
306 self
.dbtable
= testutil
.temp_table_name() # create temp db tables
307 Sequence_Test
.setUp(self
, worldbasePath
='mysql:' + self
.dbtable
,
308 mdbArgs
=dict(createLayer
='temp'))
310 testutil
.drop_tables(worldbase
._mdb
.writer
.storage
.cursor
, self
.dbtable
)
312 class InvalidPickle_Test(TestBase
):
316 class MyUnpicklableClass(object):
318 MyUnpicklableClass
.__module
__ = '__main__'
319 self
.bad
= MyUnpicklableClass()
321 self
.good
= datetime
.datetime
.today()
323 def test_invalid_pickle(self
):
324 "Testing an invalid pickle"
325 s
= metabase
.dumps(self
.good
) # should pickle with no errors
327 s
= metabase
.dumps(self
.bad
) # should raise exception
328 msg
= 'failed to catch bad attempt to invalid module ref'
329 raise ValueError(msg
)
330 except metabase
.WorldbaseNoModuleError
:
333 class XMLRPC_Test(TestBase
):
334 'create an XMLRPC server and access seqdb from it'
337 populate_swissprot() # save some data
338 worldbase
.commit() # finally save everything to metabase
339 worldbase
.clear_cache() # force all requests to reload
341 res
= [ 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap',
342 'Bio.Annotation.annoDB', 'Bio.Annotation.map' ]
343 self
.server
= testutil
.TestXMLRPCServer(res
, self
.tempdir
.path
)
344 def test_xmlrpc(self
):
346 worldbase
.clear_cache() # force all future requests to reload
347 worldbase
.update("http://localhost:%s" % self
.server
.port
) # from XMLRPC
349 check_match(self
) # run all our tests
351 check_dir_noargs(self
)
352 check_dir_download(self
)
357 sb_hbb1
= testutil
.datafile('sp_hbb1') # test readonly checks
358 sp2
= seqdb
.BlastDB(sb_hbb1
)
359 sp2
.__doc__
= 'another sp'
361 worldbase
.Bio
.Seq
.sp2
= sp2
363 msg
= 'failed to catch bad attempt to write to XMLRPC server'
368 'halt the test XMLRPC server'
372 if __name__
== '__main__':
373 PygrTestProgram(verbosity
=2)