added MapView inverse example to tutorial
[pygr.git] / tests / seqdb_test.py
blobbba7af1f65e307003a10c9f80c6b1544c976bf78
1 """
2 Tests for the pygr.seqdb module.
3 """
5 import os
6 import unittest
8 from testlib import testutil, PygrTestProgram
9 from pygr.seqdb import SequenceDB, SequenceFileDB, PrefixUnionDict, \
10 AnnotationDB, SeqPrefixUnionDict
11 from pygr.sequence import Sequence
12 from pygr.cnestedlist import NLMSA
13 import gc
14 from pygr.annotation import AnnotationDB, AnnotationSeq, AnnotationSlice, \
15 AnnotationServer, AnnotationClient
17 # utility classes for the SequenceDB tests
19 _fake_seq = "ATCGAGAGCCAGAATGACGGGACCATTAG"
20 class _SimpleFakeSequence(Sequence):
21 def __init__(self, db, id):
22 assert id == "foo"
23 Sequence.__init__(self, _fake_seq, "foo")
25 def __len__(self):
26 return len(self.seq)
28 def strslice(self, start, end):
29 return self.seq[start:end]
31 class _SimpleFakeInfoObj(object):
32 def __init__(self, length):
33 self.length = length
35 class _SimpleFakeSeqDB(SequenceDB):
36 def __init__(self, *args, **kwargs):
37 self.seqInfoDict = dict(foo=_SimpleFakeInfoObj(len(_fake_seq)))
38 SequenceDB.__init__(self, *args, **kwargs)
40 ###
42 class SequenceDB_Test(unittest.TestCase):
43 def test_repr(self):
44 "test the __repr__ function."
46 db = _SimpleFakeSeqDB(itemClass=_SimpleFakeSequence)
47 repr(db)
49 def test_create_no_itemclass(self):
50 # must supply an itemclass to SequenceDB!
51 try:
52 db = SequenceDB()
53 assert 0, "should not reach this point"
54 except TypeError:
55 pass
57 class SequenceFileDB_Test(unittest.TestCase):
58 """
59 Test for all of the basic dictionary functions on 'SequenceFileDB',
60 among other things.
61 """
62 def setUp(self):
63 "Test setup"
64 dnaseq = testutil.datafile('dnaseq.fasta')
65 self.db = SequenceFileDB(dnaseq) # contains 'seq1', 'seq2'
67 self.db._weakValueDict.clear() # clear the cache
69 def tearDown(self):
70 self.db.close() # must close SequenceFileDB!
72 def test_len(self):
73 assert len(self.db) == 2
75 def test_seqInfoDict_len(self):
76 assert len(self.db.seqInfoDict) == 2
78 def test_no_file_given(self):
79 "Make sure that a TypeError is raised when no file is available"
80 try:
81 db = SequenceFileDB()
82 assert 0, "should not reach this point"
83 except TypeError:
84 pass
86 def test_seq_descriptor(self):
87 "Check the '.seq' attribute (tied to a descriptor)"
88 s = self.db['seq1']
89 assert str(s) == str(s.seq)
91 def test_cache(self):
92 "SequenceDB cache test"
93 assert len(self.db._weakValueDict) == 0
94 seq1 = self.db['seq1']
96 # cache populated?
97 assert len(self.db._weakValueDict) == 1
98 assert 'seq1' in self.db._weakValueDict
100 # cache functions?
101 seq1_try2 = self.db['seq1']
102 assert seq1 is seq1_try2
104 def test_clear_cache(self):
105 "SequenceDB clear_cache test"
106 assert len(self.db._weakValueDict) == 0
107 seq1 = self.db['seq1']
109 # cache populated?
110 assert len(self.db._weakValueDict) == 1
111 assert 'seq1' in self.db._weakValueDict
113 # clear_cache functions?
114 self.db.clear_cache()
115 seq1_try3 = self.db['seq1']
116 assert seq1 is not seq1_try3
118 def test_keys(self):
119 "SequenceFileDB keys"
120 k = self.db.keys()
121 k.sort()
122 assert k == ['seq1', 'seq2']
124 def test_contains(self):
125 "SequenceFileDB contains"
126 assert 'seq1' in self.db, self.db.keys()
127 assert 'seq2' in self.db
128 assert 'foo' not in self.db
130 def test_invert_class(self):
131 "SequenceFileDB __invert__"
132 seq = self.db['seq1']
133 inversedb = ~self.db
134 assert inversedb[seq] == 'seq1'
135 assert seq in inversedb
136 assert 'foo' not in inversedb
138 def test_keys_info(self):
139 "SequenceFileDB keys info"
140 k = self.db.seqInfoDict.keys()
141 k.sort()
142 assert k == ['seq1', 'seq2']
144 def test_contains_info(self):
145 "SequenceFileDB contains info"
146 assert 'seq1' in self.db.seqInfoDict
147 assert 'seq2' in self.db.seqInfoDict
148 assert 'foo' not in self.db.seqInfoDict
150 def test_has_key(self):
151 "SequenceFileDB has key"
152 assert self.db.has_key('seq1')
153 assert self.db.has_key('seq2')
154 assert not self.db.has_key('foo')
156 def test_get(self):
157 "SequenceFileDB get"
158 assert self.db.get('foo') is None
159 assert self.db.get('seq1') is not None
160 assert str(self.db.get('seq1')).startswith('atggtgtca')
161 assert self.db.get('seq2') is not None
162 assert str(self.db.get('seq2')).startswith('GTGTTGAA')
164 def test_items(self):
165 "SequenceFileDB items"
166 i = [ k for (k,v) in self.db.items() ]
167 i.sort()
168 assert i == ['seq1', 'seq2']
170 def test_iterkeys(self):
171 "SequenceFileDB iterkeys"
172 kk = self.db.keys()
173 kk.sort()
174 ik = list(self.db.iterkeys())
175 ik.sort()
176 assert kk == ik
178 def test_itervalues(self):
179 "SequenceFileDB itervalues"
180 kv = self.db.values()
181 kv.sort()
182 iv = list(self.db.itervalues())
183 iv.sort()
184 assert kv == iv
186 def test_iteritems(self):
187 "SequenceFileDB iteritems"
188 ki = self.db.items()
189 ki.sort()
190 ii = list(self.db.iteritems())
191 ii.sort()
192 assert ki == ii
194 def test_readonly(self):
195 "SequenceFileDB readonly"
196 try:
197 self.db.copy() # what should 'copy' do on SequenceFileDB?
198 assert 0, 'this method should raise NotImplementedError'
199 except NotImplementedError:
200 pass
201 try:
202 self.db.clear()
203 assert 0, 'this method should raise NotImplementedError'
204 except NotImplementedError:
205 pass
206 try:
207 self.db.setdefault('foo')
208 assert 0, 'this method should raise NotImplementedError'
209 except NotImplementedError:
210 pass
211 try:
212 self.db.pop()
213 assert 0, 'this method should raise NotImplementedError'
214 except NotImplementedError:
215 pass
216 try:
217 self.db.popitem()
218 assert 0, 'this method should raise NotImplementedError'
219 except NotImplementedError:
220 pass
221 try:
222 self.db.update({})
223 assert 0, 'this method should raise NotImplementedError'
224 except NotImplementedError:
225 pass
227 # test some things other than dict behavior
228 def test_keyerror(self):
229 """SequenceFileDB keyerror.
230 Make sure that the SequenceFileDB KeyError is informative."""
231 try:
232 self.db['foo']
233 except KeyError, e:
234 assert "no key 'foo' in database <SequenceFileDB" in str(e), str(e)
236 def test_close(self):
237 """SequenceFileDB close.
238 Check closing behavior; access after close() --> ValueError """
239 self.db.close()
240 self.db.close() # closing twice should not raise an error
241 try:
242 len(self.db)
243 assert 0, 'Failed to catch invalid shelve access!'
244 except ValueError:
245 pass
246 try:
247 self.db['seq1']
248 assert 0, 'Failed to catch invalid shelve access!'
249 except ValueError:
250 pass
252 class SequenceFileDB_Creation_Test(unittest.TestCase):
254 Test some of the nastier / more polluting creation code in an
255 isolated (and slower...) class that cleans up after itself.
257 def trash_intermediate_files(self):
258 seqlen = testutil.datafile('dnaseq.fasta.seqlen')
259 pureseq = testutil.datafile('dnaseq.fasta.pureseq')
260 try:
261 os.unlink(seqlen)
262 os.unlink(pureseq)
263 except OSError:
264 pass
266 def setUp(self):
267 "Test setup"
268 self.trash_intermediate_files()
269 self.dbfile = testutil.datafile('dnaseq.fasta')
271 def tearDown(self):
272 self.trash_intermediate_files()
274 def test_basic_construction(self):
275 db = SequenceFileDB(self.dbfile)
276 try:
277 assert str(db.get('seq1')).startswith('atggtgtca')
278 assert str(db.get('seq2')).startswith('GTGTTGAA')
279 finally:
280 db.close()
282 def test_build_seqLenDict_with_reader(self):
283 "Test that building things works properly when specifying a reader."
285 class InfoBag(object):
286 def __init__(self, **kw):
287 self.__dict__.update(kw)
289 # first, load the db & save the sequence info in a list
290 l = []
291 db = SequenceFileDB(self.dbfile)
292 try:
293 for k, v in db.items():
294 info = InfoBag(id=k, length=len(v), sequence=str(v))
295 l.append(info)
296 finally:
297 # now, erase the existing files, and recreate the db.
298 db.close()
299 self.trash_intermediate_files()
301 # create a fake reader with access to the saved info
302 def my_fake_reader(fp, filename, info_list=l):
303 return info_list
305 # now try creating with the fake reader
306 db = SequenceFileDB(self.dbfile, reader=my_fake_reader)
308 # did it work?
309 try:
310 assert str(db.get('seq1')).startswith('atggtgtca')
311 assert str(db.get('seq2')).startswith('GTGTTGAA')
312 finally:
313 db.close()
315 def test_build_seqLenDict_with_bad_reader(self):
316 "Test that building things fails properly with a bad reader."
318 class InfoBag(object):
319 def __init__(self, **kw):
320 self.__dict__.update(kw)
322 # first, load the db & save the sequence info in a list
323 l = []
324 db = SequenceFileDB(self.dbfile)
325 try:
326 for k, v in db.items():
327 info = InfoBag(id=k, length=0, sequence=str(v))
328 l.append(info)
329 finally:
330 # now, erase the existing files, and recreate the db.
331 db.close()
332 self.trash_intermediate_files()
334 # create a fake reader with access to the saved info
335 def my_fake_reader(fp, filename, info_list=l):
336 return info_list
338 # now try creating with the fake reader
339 try:
340 db = SequenceFileDB(self.dbfile, reader=my_fake_reader)
341 try:
342 assert 0, "should not reach here; db construction should fail!"
343 finally:
344 db.close()
345 except ValueError:
346 pass # ValueError is expected
348 def close_pud_dicts(pud):
349 """Close all seq dbs indexed in a PrefixUnionDict """
350 for db in pud.dicts:
351 db.close()
354 class PrefixUnionDict_Creation_Test(unittest.TestCase):
356 Test PUD creation options.
358 def setUp(self):
359 self.dbfile = testutil.datafile('dnaseq.fasta')
361 def test_empty_create(self):
362 db = PrefixUnionDict()
363 assert len(db) == 0
365 def test_headerfile_create(self):
366 header = testutil.datafile('prefixUnionDict-1.txt')
367 db = PrefixUnionDict(filename=header)
368 try:
369 assert len(db) == 2
370 assert 'a.seq1' in db
371 finally:
372 close_pud_dicts(db)
374 def test_headerfile_create_conflict(self):
375 "test non-empty prefixDict with a passed in PUD header file: conflict"
376 subdb = SequenceFileDB(self.dbfile)
377 try:
378 header = testutil.datafile('prefixUnionDict-1.txt')
379 try:
380 db = PrefixUnionDict(filename=header, prefixDict={ 'foo' : subdb })
381 assert 0, "should not get here"
382 except TypeError:
383 pass
384 finally:
385 subdb.close()
387 def test_multiline_headerfile_create(self):
388 header = testutil.datafile('prefixUnionDict-2.txt')
389 db = PrefixUnionDict(filename=header)
390 try:
391 assert len(db) == 4
392 assert 'a.seq1' in db
393 assert 'b.seq1' in db
394 finally:
395 close_pud_dicts(db)
397 def test_headerfile_create_with_trypath(self):
398 header = testutil.datafile('prefixUnionDict-1.txt')
399 db = PrefixUnionDict(filename=header,
400 trypath=[os.path.dirname(header)])
401 try:
402 assert len(db) == 2, db.prefixDict
403 finally:
404 close_pud_dicts(db)
406 def test_headerfile_create_fail(self):
407 header = testutil.datafile('prefixUnionDict-3.txt')
408 try:
409 db = PrefixUnionDict(filename=header)
410 assert 0, "should not reach this point"
411 except IOError:
412 pass
413 except AssertionError:
414 close_pud_dicts(db)
415 raise
417 def test_headerfile_write(self):
418 header = testutil.datafile('prefixUnionDict-2.txt')
419 db = PrefixUnionDict(filename=header)
420 try:
421 assert len(db) == 4
422 assert 'a.seq1' in db
423 assert 'b.seq1' in db
425 output = testutil.tempdatafile('prefixUnionDict-write.txt')
426 db.writeHeaderFile(output)
427 finally:
428 close_pud_dicts(db)
430 db2 = PrefixUnionDict(filename=output,
431 trypath=[os.path.dirname(header)])
432 try:
433 assert len(db2) == 4
434 assert 'a.seq1' in db2
435 assert 'b.seq1' in db2
436 finally:
437 close_pud_dicts(db2)
439 def test_headerfile_write_fail(self):
440 subdb = SequenceFileDB(self.dbfile)
441 try:
442 del subdb.filepath # remove 'filepath' attribute for test
443 db = PrefixUnionDict({ 'prefix' : subdb })
445 assert len(db) == 2
446 assert 'prefix.seq1' in db
448 output = testutil.tempdatafile('prefixUnionDict-write-fail.txt')
449 try:
450 db.writeHeaderFile(output)
451 except AttributeError:
452 pass
453 finally:
454 subdb.close() # closes both db and subdb
456 class PrefixUnionDict_Test(unittest.TestCase):
458 Test for all of the basic dictionary functions on 'PrefixUnionDict'.
460 def setUp(self):
461 dnaseq = testutil.datafile('dnaseq.fasta')
462 seqdb = SequenceFileDB(dnaseq) # contains 'seq1', 'seq2'
463 self.db = PrefixUnionDict({ 'prefix' : seqdb })
465 def tearDown(self):
466 close_pud_dicts(self.db)
468 def test_keys(self):
469 "PrefixUnionDict keys"
470 k = self.db.keys()
471 k.sort()
472 assert k == ['prefix.seq1', 'prefix.seq2']
474 def test_contains(self):
475 "PrefixUnionDict contains"
476 # first, check "is this sequence name in the PUD?"-style contains.
477 assert 'prefix.seq1' in self.db
478 assert 'prefix.seq2' in self.db
479 assert 'foo' not in self.db
480 assert 'prefix.foo' not in self.db
482 # now, check "is this sequence in the PUD?"
483 seq = self.db['prefix.seq1']
484 assert seq in self.db
486 # finally, check failure: "is something other than str/seq in db"
487 try:
488 12345 in self.db
489 assert 0, "should not get to this point"
490 except AttributeError:
491 pass
493 def test_invert_class(self):
494 "PrefixUnionDict __invert__"
495 seq = self.db['prefix.seq1']
496 inversedb = ~self.db
497 assert inversedb[seq] == 'prefix.seq1'
498 assert seq in inversedb
499 assert 'foo' not in inversedb
501 def test_funny_key(self):
502 "check handling of ID containing multiple separators"
503 dnaseq = testutil.datafile('funnyseq.fasta')
504 seqdb = SequenceFileDB(dnaseq) # contains 'seq1', 'seq2'
505 try:
506 pudb = PrefixUnionDict({ 'prefix' : seqdb })
507 seq = pudb['prefix.seq.1.more']
508 finally:
509 seqdb.close()
511 def test_funny_key2(self):
512 "check handling of ID containing multiple separators"
513 dnaseq = testutil.datafile('funnyseq.fasta')
514 seqdb = SequenceFileDB(dnaseq) # contains 'seq1', 'seq2'
515 try:
516 pudb = PrefixUnionDict({ 'prefix' : seqdb })
517 seq = pudb['prefix.seq.2.even.longer']
518 finally:
519 seqdb.close()
521 def test_has_key(self):
522 "PrefixUnionDict has key"
523 assert self.db.has_key('prefix.seq1')
524 assert self.db.has_key('prefix.seq2')
525 assert not self.db.has_key('prefix.foo')
526 assert not self.db.has_key('foo')
528 def test_get(self):
529 "PrefixUnionDict get"
530 assert self.db.get('foo') is None
531 assert self.db.get('prefix.foo') is None
532 assert self.db.get('prefix.seq1') is not None
533 assert str(self.db.get('prefix.seq1')).startswith('atggtgtca')
534 assert self.db.get('prefix.seq2') is not None
535 assert str(self.db.get('prefix.seq2')).startswith('GTGTTGAA')
536 assert self.db.get('foo.bar') is None
537 assert self.db.get(12345) is None
539 def test_get_prefix_id(self):
540 try:
541 self.db.get_prefix_id(12345)
542 assert 0, "should not get here"
543 except KeyError:
544 pass
546 def test_getName(self):
547 seq1 = self.db['prefix.seq1']
548 name = self.db.getName(seq1)
549 assert name == 'prefix.seq1'
551 def test_items(self):
552 "PrefixUnionDict items"
553 i = [ k for (k,v) in self.db.items() ]
554 i.sort()
555 assert i == ['prefix.seq1', 'prefix.seq2']
557 def test_iterkeys(self):
558 "PrefixUnionDict iterkeys"
559 kk = self.db.keys()
560 kk.sort()
561 ik = list(self.db.iterkeys())
562 ik.sort()
563 assert kk == ik
565 def test_itervalues(self):
566 "PrefixUnionDict itervalues"
567 kv = self.db.values()
568 kv.sort()
569 iv = list(self.db.itervalues())
570 iv.sort()
571 assert kv == iv
573 def test_iteritems(self):
574 "PrefixUnionDict iteritems"
575 ki = self.db.items()
576 ki.sort()
577 ii = list(self.db.iteritems())
578 ii.sort()
579 assert ki == ii
581 # test some things other than dict behavior
582 def test_keyerror(self):
583 "PrefixUnionDict keyerror"
584 "Make sure that the PrefixUnionDict KeyError is informative."
585 try:
586 self.db['prefix.foo']
587 except KeyError, e:
588 assert "no key 'foo' in " in str(e), str(e)
589 try:
590 self.db['foo']
591 except KeyError, e:
592 assert "invalid id format; no prefix: foo" in str(e), str(e)
594 def test_readonly(self):
595 "PrefixUnionDict readonly"
596 try:
597 self.db.copy() # what should 'copy' do on PUD?
598 assert 0, 'this method should raise NotImplementedError'
599 except NotImplementedError:
600 pass
601 try: # what should 'setdefault' do on PUD?
602 self.db.setdefault('foo')
603 assert 0, 'this method should raise NotImplementedError'
604 except NotImplementedError:
605 pass
606 try: # what should 'update' do on PUD?
607 self.db.update({})
608 assert 0, 'this method should raise NotImplementedError'
609 except NotImplementedError:
610 pass
611 try:
612 self.db.clear()
613 assert 0, 'this method should raise NotImplementedError'
614 except NotImplementedError:
615 pass
616 try:
617 self.db.pop()
618 assert 0, 'this method should raise NotImplementedError'
619 except NotImplementedError:
620 pass
621 try:
622 self.db.popitem()
623 assert 0, 'this method should raise NotImplementedError'
624 except NotImplementedError:
625 pass
627 def test_seqInfoDict(self):
628 seqInfoDict = self.db.seqInfoDict
630 keylist = seqInfoDict.keys()
631 keylist.sort()
633 keylist2 = list(seqInfoDict)
634 keylist2.sort()
636 assert keylist == ['prefix.seq1', 'prefix.seq2']
637 assert keylist2 == ['prefix.seq1', 'prefix.seq2']
639 itemlist = list(seqInfoDict.iteritems())
640 itemlist.sort()
641 ((n1, i1), (n2, i2)) = itemlist
643 ii1, ii2 = list(seqInfoDict.itervalues())
645 s1i = seqInfoDict['prefix.seq1']
646 s2i = seqInfoDict['prefix.seq2']
648 assert n1 == 'prefix.seq1'
649 assert (i1.id, i1.db) == (s1i.id, s1i.db)
650 assert (ii1.id, ii1.db) == (s1i.id, s1i.db)
651 assert n2 == 'prefix.seq2'
652 assert (i2.id, i2.db) == (s2i.id, s2i.db)
653 assert (ii2.id, ii2.db) == (s2i.id, s2i.db)
655 assert seqInfoDict.has_key('prefix.seq1')
657 class PrefixUnionMemberDict_Test(unittest.TestCase):
658 def setUp(self):
659 dnaseq = testutil.datafile('dnaseq.fasta')
660 seqdb = SequenceFileDB(dnaseq) # contains 'seq1', 'seq2'
661 self.db = PrefixUnionDict({ 'prefix' : seqdb })
662 self.mdb = self.db.newMemberDict()
664 def tearDown(self):
665 close_pud_dicts(self.db)
667 def test_basic(self):
668 self.mdb['prefix'] = 'this is from seqdb dnaseq.fasta'
669 seq = self.db['prefix.seq1']
670 assert self.mdb[seq] == 'this is from seqdb dnaseq.fasta'
672 def test_possible_keys(self):
673 assert list(self.mdb.possibleKeys()) == ['prefix']
675 def test_bad_prefix(self):
676 try:
677 self.mdb['foo'] = "xyz"
678 assert 0, "should fail before this"
679 except KeyError:
680 pass
682 def test_bad_keytype(self):
683 try:
684 self.mdb['some non-seq-obj']
685 assert 0, "should fail before this"
686 except TypeError:
687 pass
689 def test_default_val(self):
690 self.mdb = self.db.newMemberDict(default='baz')
691 seq = self.db['prefix.seq1']
692 assert self.mdb[seq] == 'baz'
694 def test_no_default_val(self):
695 self.mdb = self.db.newMemberDict()
696 seq = self.db['prefix.seq1']
697 try:
698 self.mdb[seq]
699 assert 0, "should fail before this"
700 except KeyError:
701 pass
703 class SeqPrefixUnionDict_Test(unittest.TestCase):
705 Test SeqPrefixUnionDict.
707 def setUp(self):
708 dnaseq = testutil.datafile('dnaseq.fasta')
709 self.seqdb = SequenceFileDB(dnaseq) # contains 'seq1', 'seq2'
710 self.db = SeqPrefixUnionDict({ 'prefix' : self.seqdb })
712 def tearDown(self):
713 self.seqdb.close()
715 def test_basic_iadd(self):
716 dnaseq = testutil.datafile('dnaseq.fasta')
717 seqdb = SequenceFileDB(dnaseq)
718 try:
719 new_seq = seqdb['seq1']
721 self.db += new_seq
723 assert new_seq in self.db
724 name = (~self.db)[new_seq]
725 assert name == 'dnaseq.seq1', name
729 seqdb2 = SequenceFileDB(dnaseq)
730 try:
731 seqdb2.filepath = 'foo' # munge the filepath for testing
732 new_seq2 = seqdb2['seq1']
734 self.db += new_seq2
735 name2 = (~self.db)[new_seq2]
736 assert name2 == 'foo.seq1', name2
737 finally:
738 seqdb2.close()
739 finally:
740 seqdb.close()
741 # NOTE, the important thing here is less the specific names that
742 # are given (which are based on filepath) but that different names
743 # are created for the various sequences when they are added.
745 def test_iadd_db_twice(self):
746 dnaseq = testutil.datafile('dnaseq.fasta')
747 seqdb = SequenceFileDB(dnaseq)
748 try:
749 new_seq = seqdb['seq1']
751 self.db += new_seq
752 name1 = (~self.db)[new_seq]
754 self.db += new_seq # should do nothing...
755 name2 = (~self.db)[new_seq]
756 assert name1 == name2 # ...leaving seq with same name.
757 finally:
758 seqdb.close()
760 def test_iadd_user_seq(self):
761 seq = Sequence('ATGGCAGG', 'foo')
762 self.db += seq
764 name = (~self.db)[seq]
765 assert name == 'user.foo' # created a new 'user' db.
767 # ok, make sure it doesn't wipe out the old 'user' db...
768 seq2 = Sequence('ATGGCAGG', 'foo2')
769 self.db += seq2
771 name = (~self.db)[seq2]
772 assert name == 'user.foo2'
774 first_name = (~self.db)[seq]
775 assert first_name == 'user.foo'
777 def test_iadd_duplicate_seqdb(self):
778 dnaseq = testutil.datafile('dnaseq.fasta')
779 seqdb = SequenceFileDB(dnaseq)
780 try:
781 seqdb2 = SequenceFileDB(dnaseq)
782 try:
783 new_seq = seqdb['seq1']
784 new_seq2 = seqdb2['seq1']
786 self.db += new_seq
787 try:
788 self.db += new_seq2
789 assert 0, "should never reach this point"
790 except ValueError:
791 pass
792 finally:
793 seqdb2.close()
794 finally:
795 seqdb.close()
797 def test_no_db_info(self):
798 dnaseq = testutil.datafile('dnaseq.fasta')
799 seqdb = SequenceFileDB(dnaseq)
800 try:
801 new_seq = seqdb['seq1']
803 assert getattr(seqdb, '_persistent_id', None) is None
804 del seqdb.filepath
806 self.db += new_seq
807 name = (~self.db)[new_seq]
808 assert name == 'noname0.seq1'
809 finally:
810 seqdb.close()
812 def test_inverse_add_behavior(self):
813 dnaseq = testutil.datafile('dnaseq.fasta')
814 seqdb = SequenceFileDB(dnaseq)
815 try:
816 seq = seqdb['seq1']
818 name = (~self.db)[seq]
819 finally:
820 seqdb.close() # only need to close if exception occurs
822 def test_inverse_noadd_behavior(self):
823 # compare with test_inverse_add_behavior...
824 db = SeqPrefixUnionDict(addAll=False)
825 dnaseq = testutil.datafile('dnaseq.fasta')
826 seqdb = SequenceFileDB(dnaseq)
827 try:
828 seq = seqdb['seq1']
830 try:
831 name = (~db)[seq]
832 assert 0, "should not get here"
833 except KeyError:
834 pass
835 finally:
836 seqdb.close()
838 class SeqDBCache_Test(unittest.TestCase):
840 def test_cache(self):
841 "Sequence slice cache mechanics."
843 dnaseq = testutil.datafile('dnaseq.fasta')
844 db = SequenceFileDB(dnaseq)
846 try:
847 # create cache components
848 cacheDict = {}
849 cacheHint = db.cacheHint
851 # get seq1
852 seq1 = db['seq1']
854 # _cache is only created on first cache attempt
855 assert not hasattr(db, '_cache')
857 # build an 'owner' object
858 class AnonymousOwner(object):
859 pass
860 owner = AnonymousOwner()
862 # save seq1 in cache
863 cacheDict['seq1'] = (seq1.start, seq1.stop)
864 cacheHint(cacheDict, owner)
865 del cacheDict # 'owner' now holds reference
867 # peek into _cache and assert that only the ival coordinates are stored
868 v = db._cache.values()[0]
869 assert len(v['seq1']) == 2
870 del v
872 # force a cache access & check that now we've stored actual string
873 ival = str(seq1[5:10])
874 v = db._cache.values()[0]
875 # ...check that we've stored actual string
876 assert len(v['seq1']) == 3
878 # again force a cache access, this time to the stored sequence string
879 ival = str(seq1[5:10])
881 # now, eliminate all references to the cache proxy dict
882 del owner
884 # trash unused objects - not strictly necessary, because there are no
885 # islands of circular references & so all objects are already
886 # deallocated, but that's implementation dependent.
887 gc.collect()
889 # ok, cached values should now be gone.
890 v = db._cache.values()
891 assert len(v) == 0
892 finally:
893 db.close()
895 def test_nlmsaslice_cache(self):
896 "NLMSASlice sequence caching & removal"
898 # set up sequences
899 dnaseq = testutil.datafile('dnaseq.fasta')
901 db = SequenceFileDB(dnaseq, autoGC=-1) # use pure WeakValueDict...
902 try:
903 gc.collect()
904 assert len(db._weakValueDict)==0, '_weakValueDict should be empty'
905 seq1, seq2 = db['seq1'], db['seq2']
906 assert len(db._weakValueDict)==2, '_weakValueDict should have 2 seqs'
908 # build referencing NLMSA
909 mymap = NLMSA('test', 'memory', db, pairwiseMode=True)
910 mymap += seq1
911 mymap[seq1] += seq2
912 mymap.build()
914 # check: no cache
915 assert not hasattr(db, '_cache'), 'should be no cache yet'
917 seq1, seq2 = db['seq1'], db['seq2'] # re-retrieve
918 # now retrieve a NLMSASlice, forcing entry of seq into cache
919 ival = seq1[5:10]
920 x = mymap[ival]
922 assert len(db._cache.values()) != 0
924 n1 = len(db._cache)
925 assert n1 == 1, "should be exactly one cache entry, not %d" % (n1,)
927 # ok, now trash referencing arguments & make sure of cleanup
928 del x
929 gc.collect()
931 assert len(db._cache.values()) == 0
934 n2 = len(db._cache)
935 assert n2 == 0, '%d objects remain; cache memory leak!' % n2
936 # FAIL because of __dealloc__ error in cnestedlist.NLMSASlice.
938 del mymap, ival, seq1, seq2 # drop our references, cache should empty
939 gc.collect()
940 # check that db._weakValueDict cache is empty
941 assert len(db._weakValueDict)==0, '_weakValueDict should be empty'
942 finally:
943 db.close()
945 if __name__ == '__main__':
946 PygrTestProgram(verbosity=2)