2 from testlib
import testutil
, PygrTestProgram
4 import ConfigParser
, sys
, os
, string
, glob
12 config
= ConfigParser
.ConfigParser({'testOutputBaseDir' : '.', 'smallSampleKey_nlmsa': ''})
13 config
.read([ os
.path
.join(os
.path
.expanduser('~'), '.pygrrc'), os
.path
.join(os
.path
.expanduser('~'), 'pygr.cfg'), '.pygrrc', 'pygr.cfg' ])
14 mafDir
= config
.get('megatests_dm2', 'mafDir')
15 seqDir
= config
.get('megatests_dm2', 'seqDir')
16 smallSampleKey
= config
.get('megatests_dm2', 'smallSampleKey_nlmsa')
17 testInputDir
= config
.get('megatests', 'testInputDir')
18 testOutputBaseDir
= config
.get('megatests', 'testOutputBaseDir')
21 smallSamplePostfix
= '_' + smallSampleKey
23 smallSamplePostfix
= ''
25 ## mafDir CONTAINS FOLLOWING DM2 MULTIZ15WAY MAF ALIGNMENTS
26 ## seqDir CONTAINS FOLLOWING 15 GENOME ASSEMBLIES AND THEIR SEQDB FILES
27 ## TEST INPUT/OUPTUT FOR COMPARISON, THESE FILES SHOULD BE IN THIS DIRECTORY
28 ## outfileName = 'splicesite_dm2.txt' # CHR4H TESTING
29 ## outputName = 'splicesite_dm2_multiz15way.txt' # CHR4H TESTING
30 ## testDir = os.path.join(testOutputBaseDir, 'TEST_' + ''.join(tmpList)) SHOULD BE DELETED IF YOU WANT TO RUN IN '.'
32 # DIRECTIONARY FOR DOC STRING OF SEQDB
34 'anoGam1':'A. gambiae Genome (February 2003)',
35 'apiMel2':'A. mellifera Genome (January 2005)',
36 'dm2':'D. melanogaster Genome (April 2004)',
37 'dp4':'D. pseudoobscura Genome (February 2006)',
38 'droAna3':'D. ananassae Genome (February 2006)',
39 'droEre2':'D. erecta Genome (February 2006)',
40 'droGri2':'D. grimshawi Genome (February 2006)',
41 'droMoj3':'D. mojavensis Genome (February 2006)',
42 'droPer1':'D. persimilis Genome (October 2005)',
43 'droSec1':'D. sechellia Genome (October 2005)',
44 'droSim1':'D. simulans Genome (April 2005)',
45 'droVir3':'D. virilis Genome (February 2006)',
46 'droWil1':'D. willistoni Genome (February 2006)',
47 'droYak2':'D. yakuba Genome (November 2005)',
48 'triCas2':'T. castaneum Genome (September 2005)',
51 # GENOME ASSEMBLY LIST FOR DM2 MULTIZ15WAY
52 msaSpeciesList
= ['anoGam1', 'apiMel2', 'dm2', 'dp4', 'droAna3', 'droEre2', 'droGri2', 'droMoj3', \
53 'droPer1', 'droSec1', 'droSim1', 'droVir3', 'droWil1', 'droYak2', 'triCas2']
55 class PygrBuildNLMSAMegabase(unittest
.TestCase
):
56 'restrict megatest to an initially empty directory, need large space to perform'
57 def setUp(self
, testDir
= None):
59 tmpList
= [c
for c
in 'PygrBuildNLMSAMegabase']
60 random
.shuffle(tmpList
)
61 testDir
= os
.path
.join(testOutputBaseDir
, 'TEST_' + ''.join(tmpList
)) # FOR TEST, SHOULD BE DELETED
62 if testDir
is None: testDir
= 'TEST_' + ''.join(tmpList
) # NOT SPECIFIED, USE CURRENT DIRECTORY
65 testDir
= os
.path
.realpath(testDir
)
70 tmpFileName
= os
.path
.join(testDir
, 'DELETE_THIS_TEMP_FILE')
71 open(tmpFileName
, 'w').write('A'*1024*1024) # WRITE 1MB FILE FOR TESTING
74 pygr
.Data
.update(self
.path
)
75 from pygr
import seqdb
76 for orgstr
in msaSpeciesList
:
77 genome
= seqdb
.BlastDB(os
.path
.join(seqDir
, orgstr
))
78 genome
.__doc
__ = docStringDict
[orgstr
]
79 pygr
.Data
.addResource('TEST.Seq.Genome.' + orgstr
, genome
)
81 def copyFile(self
, filename
): # COPY A FILE INTO TEST DIRECTORY
82 newname
= os
.path
.join(self
.path
, os
.path
.basename(filename
))
83 open(newname
, 'w').write(open(filename
, 'r').read())
86 'delete the temporary directory and files, restore pygr.Data path'
87 for dirpath
, subdirs
, files
in os
.walk(self
.path
, topdown
= False): # SHOULD BE DELETED BOTTOM-UP FASHION
88 # THIS PART MAY NOT WORK IN NFS MOUNTED DIRECTORY DUE TO .nfsXXXXXXXXX CREATION
89 # IN NFS MOUNTED DIRECTORY, IT CANNOT BE DELETED UNTIL CLOSING PYGRDATA
90 for filename
in files
:
91 os
.remove(os
.path
.join(dirpath
, filename
))
93 # Restore original pygr.Data path to remedy lack of isolation
94 # between tests from the same run
95 pygr
.Data
.update(None)
98 class Build_Test(PygrBuildNLMSAMegabase
):
100 'Check pygr.Data contents'
101 l
= pygr
.Data
.dir('TEST')
102 preList
= ['TEST.Seq.Genome.' + orgstr
for orgstr
in msaSpeciesList
]
104 def test_build(self
):
105 'Test building an NLMSA and querying results'
106 from pygr
import seqdb
, cnestedlist
108 for orgstr
in msaSpeciesList
:
109 genomedict
[orgstr
] = pygr
.Data
.getResource('TEST.Seq.Genome.' + orgstr
)
110 uniondict
= seqdb
.PrefixUnionDict(genomedict
)
112 maflist
= ( os
.path
.join(mafDir
, smallSampleKey
+ '.maf'), )
114 maflist
= glob
.glob(os
.path
.join(mafDir
, '*.maf'))
116 msaname
= os
.path
.join(self
.path
, 'dm2_multiz15way')
117 msa1
= cnestedlist
.NLMSA(msaname
, 'w', uniondict
, maflist
, maxlen
= 536870912, maxint
= 22369620) # 500MB VERSION
118 msa1
.__doc__
= 'TEST NLMSA for dm2 multiz15way'
119 pygr
.Data
.addResource('TEST.MSA.UCSC.dm2_multiz15way', msa1
)
121 msa
= pygr
.Data
.getResource('TEST.MSA.UCSC.dm2_multiz15way')
122 outfileName
= os
.path
.join(testInputDir
, 'splicesite_dm2%s.txt' % smallSamplePostfix
)
123 outputName
= os
.path
.join(testInputDir
, 'splicesite_dm2%s_multiz15way.txt' % smallSamplePostfix
)
124 newOutputName
= os
.path
.join(self
.path
, 'splicesite_new1.txt')
125 tmpInputName
= self
.copyFile(outfileName
)
126 tmpOutputName
= self
.copyFile(outputName
)
127 outfile
= open(newOutputName
, 'w')
128 for lines
in open(tmpInputName
, 'r').xreadlines():
129 chrid
, intstart
, intend
, nobs
= string
.split(lines
.strip(), '\t')
130 intstart
, intend
, nobs
= int(intstart
), int(intend
), int(nobs
)
131 site1
= msa
.seqDict
['dm2' + '.' + chrid
][intstart
:intstart
+2]
132 site2
= msa
.seqDict
['dm2' + '.' + chrid
][intend
-2:intend
]
133 edges1
= msa
[site1
].edges()
134 edges2
= msa
[site2
].edges()
135 if len(edges1
) == 0: # EMPTY EDGES
136 wlist
= str(site1
), 'dm2', chrid
, intstart
, intstart
+2, '', '', '', '', ''
137 outfile
.write('\t'.join(map(str, wlist
)) + '\n')
138 if len(edges2
) == 0: # EMPTY EDGES
139 wlist
= str(site2
), 'dm2', chrid
, intend
-2, intend
, '', '', '', '', ''
140 outfile
.write('\t'.join(map(str, wlist
)) + '\n')
142 for src
, dest
, e
in edges1
:
143 if len(str(src
)) != 2 or len(str(dest
)) != 2: continue
144 dotindex
= (~msa
.seqDict
)[src
].index('.')
145 srcspecies
, src1
= (~msa
.seqDict
)[src
][:dotindex
], (~msa
.seqDict
)[src
][dotindex
+1:]
146 dotindex
= (~msa
.seqDict
)[dest
].index('.')
147 destspecies
, dest1
= (~msa
.seqDict
)[dest
][:dotindex
], (~msa
.seqDict
)[dest
][dotindex
+1:]
148 wlist
= str(src
), srcspecies
, src1
, src
.start
, src
.stop
, str(dest
), \
149 destspecies
, dest1
, dest
.start
, dest
.stop
150 saveList
.append('\t'.join(map(str, wlist
)) + '\n')
151 for src
, dest
, e
in edges2
:
152 if len(str(src
)) != 2 or len(str(dest
)) != 2: continue
153 dotindex
= (~msa
.seqDict
)[src
].index('.')
154 srcspecies
, src1
= (~msa
.seqDict
)[src
][:dotindex
], (~msa
.seqDict
)[src
][dotindex
+1:]
155 dotindex
= (~msa
.seqDict
)[dest
].index('.')
156 destspecies
, dest1
= (~msa
.seqDict
)[dest
][:dotindex
], (~msa
.seqDict
)[dest
][dotindex
+1:]
157 wlist
= str(src
), srcspecies
, src1
, src
.start
, src
.stop
, str(dest
), \
158 destspecies
, dest1
, dest
.start
, dest
.stop
159 saveList
.append('\t'.join(map(str, wlist
)) + '\n')
160 saveList
.sort() # SORTED IN ORDER TO COMPARE WITH PREVIOUS RESULTS
161 for saveline
in saveList
:
162 outfile
.write(saveline
)
164 md5old
= hashlib
.md5()
165 md5old
.update(open(newOutputName
, 'r').read())
166 md5new
= hashlib
.md5()
167 md5new
.update(open(tmpOutputName
, 'r').read())
168 assert md5old
.digest() == md5new
.digest() # MD5 COMPARISON INSTEAD OF COMPARING EACH CONTENTS
171 msafilelist
= glob
.glob(msaname
+ '*')
173 cnestedlist
.dump_textfile(msaname
, os
.path
.join(self
.path
, 'dm2_multiz15way.txt'))
174 for filename
in msafilelist
: os
.remove(filename
)
175 runPath
= os
.path
.realpath(os
.curdir
)
177 cnestedlist
.textfile_to_binaries('dm2_multiz15way.txt')
180 msa1
= cnestedlist
.NLMSA(msaname
, 'r')
181 msa1
.__doc__
= 'TEST NLMSA for dm2 multiz15way'
182 pygr
.Data
.addResource('TEST.MSA.UCSC.dm2_multiz15way', msa1
)
184 msa
= pygr
.Data
.getResource('TEST.MSA.UCSC.dm2_multiz15way')
185 newOutputName
= os
.path
.join(self
.path
, 'splicesite_new2.txt')
186 tmpInputName
= self
.copyFile(outfileName
)
187 tmpOutputName
= self
.copyFile(outputName
)
188 outfile
= open(newOutputName
, 'w')
189 for lines
in open(tmpInputName
, 'r').xreadlines():
190 chrid
, intstart
, intend
, nobs
= string
.split(lines
.strip(), '\t')
191 intstart
, intend
, nobs
= int(intstart
), int(intend
), int(nobs
)
192 site1
= msa
.seqDict
['dm2' + '.' + chrid
][intstart
:intstart
+2]
193 site2
= msa
.seqDict
['dm2' + '.' + chrid
][intend
-2:intend
]
194 edges1
= msa
[site1
].edges()
195 edges2
= msa
[site2
].edges()
196 if len(edges1
) == 0: # EMPTY EDGES
197 wlist
= str(site1
), 'dm2', chrid
, intstart
, intstart
+2, '', '', '', '', ''
198 outfile
.write('\t'.join(map(str, wlist
)) + '\n')
199 if len(edges2
) == 0: # EMPTY EDGES
200 wlist
= str(site2
), 'dm2', chrid
, intend
-2, intend
, '', '', '', '', ''
201 outfile
.write('\t'.join(map(str, wlist
)) + '\n')
203 for src
, dest
, e
in edges1
:
204 if len(str(src
)) != 2 or len(str(dest
)) != 2: continue
205 dotindex
= (~msa
.seqDict
)[src
].index('.')
206 srcspecies
, src1
= (~msa
.seqDict
)[src
][:dotindex
], (~msa
.seqDict
)[src
][dotindex
+1:]
207 dotindex
= (~msa
.seqDict
)[dest
].index('.')
208 destspecies
, dest1
= (~msa
.seqDict
)[dest
][:dotindex
], (~msa
.seqDict
)[dest
][dotindex
+1:]
209 wlist
= str(src
), srcspecies
, src1
, src
.start
, src
.stop
, str(dest
), \
210 destspecies
, dest1
, dest
.start
, dest
.stop
211 saveList
.append('\t'.join(map(str, wlist
)) + '\n')
212 for src
, dest
, e
in edges2
:
213 if len(str(src
)) != 2 or len(str(dest
)) != 2: continue
214 dotindex
= (~msa
.seqDict
)[src
].index('.')
215 srcspecies
, src1
= (~msa
.seqDict
)[src
][:dotindex
], (~msa
.seqDict
)[src
][dotindex
+1:]
216 dotindex
= (~msa
.seqDict
)[dest
].index('.')
217 destspecies
, dest1
= (~msa
.seqDict
)[dest
][:dotindex
], (~msa
.seqDict
)[dest
][dotindex
+1:]
218 wlist
= str(src
), srcspecies
, src1
, src
.start
, src
.stop
, str(dest
), \
219 destspecies
, dest1
, dest
.start
, dest
.stop
220 saveList
.append('\t'.join(map(str, wlist
)) + '\n')
221 saveList
.sort() # SORTED IN ORDER TO COMPARE WITH PREVIOUS RESULTS
222 for saveline
in saveList
:
223 outfile
.write(saveline
)
225 md5old
= hashlib
.md5()
226 md5old
.update(open(newOutputName
, 'r').read())
227 md5new
= hashlib
.md5()
228 md5new
.update(open(tmpOutputName
, 'r').read())
229 assert md5old
.digest() == md5new
.digest() # MD5 COMPARISON INSTEAD OF COMPARING EACH CONTENTS
232 if __name__
== '__main__':
233 PygrTestProgram(verbosity
=2)