added SQLTable_Test.test_orderBy()
[pygr.git] / tests / worldbase_test.py
blob25b4760f8597bfef8276a8f488d4cac8c6dfc063
1 import socket, unittest, os, md5, pickle, datetime
3 import testlib
4 from testlib import testutil, SkipTest, PygrTestProgram
5 from pygr import seqdb, cnestedlist, metabase, mapping
6 from pygr import worldbase
7 from pygr.downloader import SourceURL, GenericBuilder
9 try:
10 set
11 except NameError:
12 from sets import Set as set
14 class TestBase(unittest.TestCase):
15 "A base class to all worldbase test classes"
17 def setUp(self, worldbasePath=None, **kwargs):
18 # overwrite the WORLDBASEPATH environment variable
19 self.tempdir = testutil.TempDir('pygrdata')
20 if worldbasePath is None:
21 worldbasePath = self.tempdir.path
22 worldbase.update(worldbasePath, **kwargs)
23 # handy shortcuts
24 self.EQ = self.assertEqual
26 class Download_Test(TestBase):
27 "Save seq db and interval to worldbase shelve"
29 # tested elsewhere as well, on Linux makes gzip ask for permissions
30 # to overwrite
31 def test_download(self):
32 "Downloading of gzipped file using worldbase"
34 url = SourceURL('http://www.doe-mbi.ucla.edu/~leec/test.gz')
35 url.__doc__ = 'test download'
37 worldbase.add_resource('Bio.Test.Download1', url)
38 worldbase.commit()
40 # performs the download
41 fpath = worldbase.Bio.Test.Download1()
42 h = testutil.get_file_md5(fpath)
43 self.assertEqual(h.hexdigest(), 'f95656496c5182d6cff9a56153c9db73')
44 os.remove(fpath)
46 class GenericBuild_Test(TestBase):
48 def test_generic_build(self):
49 "GenericBuilder construction of the BlastDB"
51 sp_hbb1 = testutil.datafile('sp_hbb1')
52 gb = GenericBuilder('BlastDB', sp_hbb1)
53 s = pickle.dumps(gb)
54 db = pickle.loads(s) # force construction of the BlastDB
55 self.EQ(len(db), 24)
57 found = [x for x in db]
58 found.sort()
60 expected = ['HBB0_PAGBO', 'HBB1_ANAMI', 'HBB1_CYGMA', 'HBB1_IGUIG',
61 'HBB1_MOUSE', 'HBB1_ONCMY', 'HBB1_PAGBO', 'HBB1_RAT',
62 'HBB1_SPHPU', 'HBB1_TAPTE', 'HBB1_TORMA', 'HBB1_TRICR',
63 'HBB1_UROHA', 'HBB1_VAREX', 'HBB1_XENBO', 'HBB1_XENLA',
64 'HBB1_XENTR', 'MYG_DIDMA', 'MYG_ELEMA', 'MYG_ERIEU',
65 'MYG_ESCGI', 'MYG_GALCR', 'PRCA_ANASP', 'PRCA_ANAVA']
66 expected.sort()
68 self.EQ(expected, found)
70 class DNAAnnotation_Test(TestBase):
72 def setUp(self,**kwargs):
73 TestBase.setUp(self)
74 dnaseq = testutil.datafile('dnaseq.fasta')
75 tryannot = testutil.tempdatafile('tryannot')
77 db = seqdb.BlastDB(dnaseq)
78 try:
79 db.__doc__ = 'little dna'
81 worldbase.Bio.Test.dna = db
82 annoDB = seqdb.AnnotationDB({1:('seq1',5,10,'fred'),
83 2:('seq1',-60,-50,'bob'),
84 3:('seq2',-20,-10,'mary')},
85 db,
86 sliceAttrDict=dict(id=0, start=1, stop=2,
87 name=3))
88 annoDB.__doc__ = 'trivial annotation'
89 worldbase.Bio.Test.annoDB = annoDB
90 nlmsa = cnestedlist.NLMSA(tryannot,'w',pairwiseMode=True,
91 bidirectional=False)
92 try:
93 for annID in annoDB:
94 nlmsa.addAnnotation(annoDB[annID])
96 nlmsa.build()
97 nlmsa.__doc__ = 'trivial map'
98 worldbase.Bio.Test.map = nlmsa
99 worldbase.schema.Bio.Test.map = metabase.ManyToManyRelation(db,
100 annoDB,bindAttrs=('exons',))
101 worldbase.commit()
102 worldbase.clear_cache()
103 finally:
104 nlmsa.close()
105 finally:
106 db.close()
108 def test_annotation(self):
109 "Annotation test"
110 db = worldbase.Bio.Test.dna()
111 try:
112 s1 = db['seq1']
113 l = s1.exons.keys()
114 annoDB = worldbase.Bio.Test.annoDB()
115 assert l == [annoDB[1], -(annoDB[2])]
116 assert l[0].sequence == s1[5:10]
117 assert l[1].sequence == s1[50:60]
118 assert l[0].name == 'fred','test annotation attribute access'
119 assert l[1].name == 'bob'
120 sneg = -(s1[:55])
121 l = sneg.exons.keys()
122 assert l == [annoDB[2][5:], -(annoDB[1])]
123 assert l[0].sequence == -(s1[50:55])
124 assert l[1].sequence == -(s1[5:10])
125 assert l[0].name == 'bob'
126 assert l[1].name == 'fred'
127 finally:
128 db.close() # close SequenceFileDB
129 worldbase.Bio.Test.map().close() # close NLMSA
131 def populate_swissprot():
132 "Populate the current worldbase with swissprot data"
133 # build BlastDB out of the sequences
134 sp_hbb1 = testutil.datafile('sp_hbb1')
135 sp = seqdb.BlastDB(sp_hbb1)
136 sp.__doc__ = 'little swissprot'
137 worldbase.Bio.Seq.Swissprot.sp42 = sp
139 # also store a fragment
140 hbb = sp['HBB1_TORMA']
141 ival= hbb[10:35]
142 ival.__doc__ = 'fragment'
143 worldbase.Bio.Seq.frag = ival
145 # build a mapping to itself
146 m = mapping.Mapping(sourceDB=sp,targetDB=sp)
147 trypsin = sp['PRCA_ANAVA']
148 m[hbb] = trypsin
149 m.__doc__ = 'map sp to itself'
150 worldbase.Bio.Seq.spmap = m
152 # create an annotation database and bind as exons attribute
153 worldbase.schema.Bio.Seq.spmap = metabase.OneToManyRelation(sp, sp,
154 bindAttrs=('buddy',))
155 annoDB = seqdb.AnnotationDB({1:('HBB1_TORMA',10,50)}, sp,
156 sliceAttrDict=dict(id=0, start=1, stop=2))
157 exon = annoDB[1]
159 # generate the names where these will be stored
160 tempdir = testutil.TempDir('exonAnnot')
161 filename = tempdir.subfile('cnested')
162 nlmsa = cnestedlist.NLMSA(filename, 'w', pairwiseMode=True,
163 bidirectional=False)
164 nlmsa.addAnnotation(exon)
165 nlmsa.build()
166 annoDB.__doc__ = 'a little annotation db'
167 nlmsa.__doc__ = 'a little map'
168 worldbase.Bio.Annotation.annoDB = annoDB
169 worldbase.Bio.Annotation.map = nlmsa
170 worldbase.schema.Bio.Annotation.map = \
171 metabase.ManyToManyRelation(sp, annoDB, bindAttrs=('exons',))
173 def check_match(self):
174 frag = worldbase.Bio.Seq.frag()
175 correct = worldbase.Bio.Seq.Swissprot.sp42()['HBB1_TORMA'][10:35]
176 assert frag == correct, 'seq ival should match'
177 assert frag.__doc__ == 'fragment', 'docstring should match'
178 assert str(frag) == 'IQHIWSNVNVVEITAKALERVFYVY', 'letters should match'
179 assert len(frag) == 25, 'length should match'
180 assert len(frag.path) == 142, 'length should match'
182 #store = PygrDataTextFile('results/seqdb1.pickle')
183 #saved = store['hbb1 fragment']
184 #assert frag == saved, 'seq ival should matched stored result'
186 def check_dir(self):
187 expected=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
188 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
189 expected.sort()
190 found = worldbase.dir('Bio')
191 found.sort()
192 assert found == expected
194 def check_dir_noargs(self):
195 found = worldbase.dir()
196 found.sort()
197 found2 = worldbase.dir('')
198 found2.sort()
199 assert found == found2
201 def check_dir_download(self):
202 found = worldbase.dir(download=True)
203 found.sort()
204 found2 = worldbase.dir('', download=True)
205 found2.sort()
206 assert len(found) == 0
207 assert found == found2
209 def check_dir_re(self):
210 expected=['Bio.Annotation.annoDB', 'Bio.Annotation.map',
211 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap']
212 expected.sort()
213 found = worldbase.dir('^Bio', 'r')
214 found.sort()
215 assert found == expected
217 expected = ['Bio.Seq.Swissprot.sp42', 'Bio.Seq.spmap']
218 expected.sort()
219 found = worldbase.dir('^Bio\..+\.sp', 'r')
220 found.sort()
221 assert found == expected
223 def check_bind(self):
224 sp = worldbase.Bio.Seq.Swissprot.sp42()
225 hbb = sp['HBB1_TORMA']
226 trypsin = sp['PRCA_ANAVA']
227 assert hbb.buddy == trypsin, 'automatic schema attribute binding'
229 def check_bind2(self):
230 sp = worldbase.Bio.Seq.Swissprot.sp42()
231 hbb = sp['HBB1_TORMA']
232 exons = hbb.exons.keys()
233 assert len(exons)==1, 'number of expected annotations'
234 annoDB = worldbase.Bio.Annotation.annoDB()
235 exon = annoDB[1]
236 assert exons[0] == exon, 'test annotation comparison'
237 assert exons[0].pathForward is exon,'annotation parent match'
238 assert exons[0].sequence == hbb[10:50],'annotation to sequence match'
239 onc = sp['HBB1_ONCMY']
240 try:
241 exons = onc.exons.keys()
242 raise ValueError('failed to catch query with no annotations')
243 except KeyError:
244 pass
246 class Sequence_Test(TestBase):
247 def setUp(self, *args, **kwargs):
248 TestBase.setUp(self, *args, **kwargs)
249 populate_swissprot()
250 worldbase.commit() # finally save everything
251 worldbase.clear_cache() # force all requests to reload
253 def test_match(self):
254 "Test matching sequences"
255 check_match(self)
257 def test_dir(self):
258 "Test labels"
259 check_dir(self)
260 check_dir_noargs(self)
261 check_dir_re(self)
263 def test_bind(self):
264 "Test bind"
265 check_bind(self)
266 check_bind2(self)
268 def test_schema(self):
269 "Test schema"
270 sp_hbb1 = testutil.datafile('sp_hbb1')
271 sp2 = seqdb.BlastDB(sp_hbb1)
272 sp2.__doc__ = 'another sp'
273 worldbase.Bio.Seq.sp2 = sp2
274 sp = worldbase.Bio.Seq.Swissprot.sp42()
275 m = mapping.Mapping(sourceDB=sp,targetDB=sp2)
276 m.__doc__ = 'sp -> sp2'
277 worldbase.Bio.Seq.testmap = m
278 worldbase.schema.Bio.Seq.testmap = metabase.OneToManyRelation(sp, sp2)
279 worldbase.commit()
281 worldbase.clear_cache()
283 sp3 = seqdb.BlastDB(sp_hbb1)
284 sp3.__doc__ = 'sp number 3'
285 worldbase.Bio.Seq.sp3 = sp3
286 sp2 = worldbase.Bio.Seq.sp2()
287 m = mapping.Mapping(sourceDB=sp3,targetDB=sp2)
288 m.__doc__ = 'sp3 -> sp2'
289 worldbase.Bio.Seq.testmap2 = m
290 worldbase.schema.Bio.Seq.testmap2 = metabase.OneToManyRelation(sp3, sp2)
291 l = worldbase._mdb.resourceCache.keys()
292 l.sort()
293 assert l == ['Bio.Seq.sp2', 'Bio.Seq.sp3', 'Bio.Seq.testmap2']
294 worldbase.commit()
295 g = worldbase._mdb.writer.storage.graph
296 expected = set(['Bio.Annotation.annoDB',
297 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.sp2', 'Bio.Seq.sp3'])
298 found = set(g.keys())
299 self.EQ(len(expected - found), 0)
301 class SQL_Sequence_Test(Sequence_Test):
302 def setUp(self):
303 if not testutil.mysql_enabled():
304 raise SkipTest, "no MySQL installed"
306 self.dbtable = testutil.temp_table_name() # create temp db tables
307 Sequence_Test.setUp(self, worldbasePath='mysql:' + self.dbtable,
308 mdbArgs=dict(createLayer='temp'))
309 def tearDown(self):
310 testutil.drop_tables(worldbase._mdb.writer.storage.cursor, self.dbtable)
312 class InvalidPickle_Test(TestBase):
314 def setUp(self):
315 TestBase.setUp(self)
316 class MyUnpicklableClass(object):
317 pass
318 MyUnpicklableClass.__module__ = '__main__'
319 self.bad = MyUnpicklableClass()
321 self.good = datetime.datetime.today()
323 def test_invalid_pickle(self):
324 "Testing an invalid pickle"
325 s = metabase.dumps(self.good) # should pickle with no errors
326 try:
327 s = metabase.dumps(self.bad) # should raise exception
328 msg = 'failed to catch bad attempt to invalid module ref'
329 raise ValueError(msg)
330 except metabase.WorldbaseNoModuleError:
331 pass
333 class XMLRPC_Test(TestBase):
334 'create an XMLRPC server and access seqdb from it'
335 def setUp(self):
336 TestBase.setUp(self)
337 populate_swissprot() # save some data
338 worldbase.commit() # finally save everything to metabase
339 worldbase.clear_cache() # force all requests to reload
341 res = [ 'Bio.Seq.Swissprot.sp42', 'Bio.Seq.frag', 'Bio.Seq.spmap',
342 'Bio.Annotation.annoDB', 'Bio.Annotation.map' ]
343 self.server = testutil.TestXMLRPCServer(res, self.tempdir.path)
344 def test_xmlrpc(self):
345 "Test XMLRPC"
346 worldbase.clear_cache() # force all future requests to reload
347 worldbase.update("http://localhost:%s" % self.server.port) # from XMLRPC
349 check_match(self) # run all our tests
350 check_dir(self)
351 check_dir_noargs(self)
352 check_dir_download(self)
353 check_dir_re(self)
354 check_bind(self)
355 check_bind2(self)
357 sb_hbb1 = testutil.datafile('sp_hbb1') # test readonly checks
358 sp2 = seqdb.BlastDB(sb_hbb1)
359 sp2.__doc__ = 'another sp'
360 try:
361 worldbase.Bio.Seq.sp2 = sp2
362 worldbase.commit()
363 msg = 'failed to catch bad attempt to write to XMLRPC server'
364 raise KeyError(msg)
365 except ValueError:
366 pass
367 def tearDown(self):
368 'halt the test XMLRPC server'
369 self.server.close()
372 if __name__ == '__main__':
373 PygrTestProgram(verbosity=2)