changed BlockGenerator to select multiple columns at once, to handle cases where...
[pygr.git] / pygr / blast.py
blob59350eb7609b0c43250cb8cfdc0232cd4451af13
1 import os, tempfile, glob
2 import classutil, logger
3 from sequtil import *
4 from parse_blast import BlastHitParser
5 from seqdb import write_fasta, read_fasta
6 from nlmsa_utils import CoordsGroupStart, CoordsGroupEnd, CoordsToIntervals,\
7 EmptySlice
8 from annotation import AnnotationDB, TranslationAnnot, TranslationAnnotSlice
9 import cnestedlist
10 import translationDB
11 import UserDict
13 # NCBI HAS THE NASTY HABIT OF TREATING THE IDENTIFIER AS A BLOB INTO
14 # WHICH THEY STUFF FIELD AFTER FIELD... E.G. gi|1234567|foobarU|NT_1234567|...
15 # THIS JUST YANKS OUT THE SECOND ARGUMENT SEPARATED BY |
16 NCBI_ID_PARSER=lambda id:id.split('|')[1]
20 def blast_program(query_type,db_type):
21 progs= {DNA_SEQTYPE:{DNA_SEQTYPE:'blastn', PROTEIN_SEQTYPE:'blastx'},
22 PROTEIN_SEQTYPE:{DNA_SEQTYPE:'tblastn', PROTEIN_SEQTYPE:'blastp'}}
23 if query_type == RNA_SEQTYPE:
24 query_type = DNA_SEQTYPE
25 if db_type == RNA_SEQTYPE:
26 db_type = DNA_SEQTYPE
27 return progs[query_type][db_type]
30 def read_blast_alignment(ofile, srcDB, destDB, al=None, pipeline=None,
31 translateSrc=False, translateDest=False):
32 """Apply sequence of transforms to read input from 'ofile'.
34 srcDB: database for finding query sequences from the blast input;
36 destDB: database for finding subject sequences from the blast input;
38 al, if not None, must be a writeable alignment object in which to
39 store the alignment intervals;
41 translateSrc=True forces creation of a TranslationDB representing
42 the possible 6-frames of srcDB (for blastx, tblastx);
44 translateDest=True forces creation of a TranslationDB representing
45 the possible 6-frames of destDB (for tblastn, tblastx).
47 If pipeline is not None, it must be a list of filter functions each
48 taking a single argument and returning an iterator or iterable result
49 object.
50 """
51 p = BlastHitParser()
52 d = dict(id='src_id', start='src_start', stop='src_end', ori='src_ori',
53 idDest='dest_id', startDest='dest_start',
54 stopDest='dest_end', oriDest='dest_ori')
55 if translateSrc:
56 srcDB = translationDB.get_translation_db(srcDB)
57 if translateDest:
58 destDB = translationDB.get_translation_db(destDB)
59 cti = CoordsToIntervals(srcDB, destDB, d)
60 alignedIvals = cti(p.parse_file(ofile))
61 if pipeline is None:
62 result = save_interval_alignment(alignedIvals, al)
63 else: # apply all the filters in our pipeline one by one
64 result = alignedIvals
65 for f in pipeline:
66 result = f(result)
67 return result
69 def save_interval_alignment(alignedIvals, al=None):
70 """Save alignedIvals to al, or a new in-memory NLMSA"""
71 needToBuild = False
72 if al is None:
73 al = cnestedlist.NLMSA('blasthits', 'memory', pairwiseMode=True,
74 bidirectional=False)
75 needToBuild = True
76 al.add_aligned_intervals(alignedIvals)
77 if needToBuild:
78 al.build()
79 return al
81 def start_blast(cmd, seq, seqString=None, seqDict=None, **kwargs):
82 """Run blast and return results."""
83 p = classutil.FilePopen(cmd, stdin=classutil.PIPE, stdout=classutil.PIPE,
84 **kwargs)
85 if seqString is None:
86 seqString = seq
87 if seqDict is not None: # write all seqs to nonblocking ifile
88 for seqID, seq in seqDict.iteritems():
89 write_fasta(p.stdin, seq)
90 seqID = None
91 else: # just write one query sequence
92 seqID = write_fasta(p.stdin, seqString)
93 if p.wait(): # blast returned error code
94 raise OSError('command %s failed' % ' '.join(cmd))
95 return seqID, p
97 def process_blast(cmd, seq, blastDB, al=None, seqString=None, queryDB=None,
98 popenArgs={}, **kwargs):
99 """Run blast and return an alignment."""
100 seqID,p = start_blast(cmd, seq, seqString, seqDict=queryDB, **popenArgs)
101 try:
102 if not queryDB: # need a query db for translation / parsing results
103 try:
104 queryDB = seq.db # use this sequence's database
105 except AttributeError:
106 queryDB = { seqID : seq } # construct a trivial "database"
108 al = read_blast_alignment(p.stdout, queryDB, blastDB, al, **kwargs)
109 finally:
110 p.close() # close our PIPE files
111 return al
114 def repeat_mask(seq, progname='RepeatMasker', opts=()):
115 'Run RepeatMasker on a sequence, return lowercase-masked string'
116 ## fd, temppath = tempfile.mkstemp()
117 ## ofile = os.fdopen(fd, 'w') # text file
118 p = classutil.FilePopen([progname] + list(opts), stdin=classutil.PIPE,
119 stdinFlag=None)
120 write_fasta(p.stdin, seq, reformatter=lambda x:x.upper()) # save uppercase!
121 try:
122 if p.wait():
123 raise OSError('command %s failed' % ' '.join(p.args[0]))
124 ifile = file(p._stdin_path + '.masked', 'rU') # text file
125 try:
126 for id,title,seq_masked in read_fasta(ifile):
127 break # JUST READ ONE SEQUENCE
128 finally:
129 ifile.close()
130 finally: # clean up our temp files no matter what happened
131 p.close() # close temp stdin file
132 for fpath in glob.glob(p._stdin_path + '.*'):
133 try:
134 os.remove(fpath)
135 except OSError:
136 pass
137 return seq_masked # ONLY THE REPEATS ARE IN LOWERCASE NOW
139 def warn_if_whitespace(filepath):
140 l = filepath.split() # check filepath for whitespace
141 if len(l) > 1 or len(l[0]) < len(filepath): # contains whitespace
142 logger.warn("""
143 Your sequence filepath contains whitespace characters:
145 The NCBI formatdb (and blastall) programs cannot handle file paths
146 containing whitespace! This is a known NCBI formatdb / blastall bug.
147 Please use a path containing no whitespace characters!""" % filepath)
148 return True # signal caller that a warning was issued
151 class BlastMapping(object):
152 'Graph interface for mapping a sequence to homologs in a seq db via BLAST'
153 def __init__(self, seqDB, filepath=None, blastReady=False,
154 blastIndexPath=None, blastIndexDirs=None, verbose=True,
155 showFormatdbMessages=True, **kwargs):
156 '''seqDB: sequence database object to search for homologs
157 filepath: location of FASTA format file for the sequence database
158 blastReady: if True, ensure that BLAST index file ready to use
159 blastIndexPath: location of the BLAST index file
160 blastIndexDirs: list of directories for trying to build index in
162 self.seqDB = seqDB
163 self.idIndex = BlastIDIndex(seqDB)
164 self.verbose = verbose
165 self.showFormatdbMessages = showFormatdbMessages
166 if filepath is not None:
167 self.filepath = filepath
168 else:
169 self.filepath = seqDB.filepath
170 if blastIndexPath is not None:
171 self.blastIndexPath = blastIndexPath
172 if blastIndexDirs is not None:
173 self.blastIndexDirs = blastIndexDirs
174 self.checkdb() # CHECK WHETHER BLAST INDEX FILE IS PRESENT...
175 if not self.blastReady and blastReady: # FORCE CONSTRUCTION OF BLAST DB
176 self.formatdb()
177 def __repr__(self):
178 return "<BlastMapping '%s'>" % (self.filepath)
179 def __getitem__(self, k):
180 'return NLMSASlice representing BLAST results'
181 al = self.__call__(k) # run BLAST & get NLMSA storing results
182 return al[k] # return NLMSASlice representing these results
183 def test_db_location(self, testpath):
184 '''check whether BLAST index files ready for use; return status.'''
185 if not os.access(testpath+'.nsd',os.R_OK) \
186 and not os.access(testpath+'.psd',os.R_OK) \
187 and not os.access(testpath+'.00.nsd',os.R_OK) \
188 and not os.access(testpath+'.00.psd',os.R_OK):
189 return False
190 else: # FOUND INDEX FILES IN THIS LOCATION
191 if testpath != self.filepath:
192 self.blastIndexPath = testpath
193 return True
194 def checkdb(self):
195 'look for blast index files in blastIndexPath, standard list of locations'
196 for testpath in self.blast_index_paths():
197 self.blastReady = self.test_db_location(testpath)
198 if self.blastReady:
199 break
200 return self.blastReady
201 def run_formatdb(self, testpath):
202 'ATTEMPT TO BUILD BLAST DATABASE INDEXES at testpath'
203 dirname = classutil.file_dirpath(testpath)
204 if not os.access(dirname, os.W_OK): # check if directory is writable
205 raise IOError('run_formatdb: directory %s is not writable!'
206 % dirname)
207 cmd = ['formatdb', '-i', self.filepath, '-n', testpath, '-o', 'T']
208 if self.seqDB._seqtype != PROTEIN_SEQTYPE:
209 cmd += ['-p', 'F'] # special flag required for nucleotide seqs
210 logger.info('Building index: ' + ' '.join(cmd))
211 if self.showFormatdbMessages:
212 kwargs = {}
213 else: # suppress formatdb messages by redirecting them
214 kwargs = dict(stdout=classutil.PIPE, stderr=classutil.PIPE)
215 if classutil.call_subprocess(cmd, **kwargs):
216 # bad exit code, so command failed
217 warn_if_whitespace(self.filepath) \
218 or warn_if_whitespace(testpath) # only issue one warning
219 raise OSError('command %s failed' % ' '.join(cmd))
220 self.blastReady=True
221 if testpath!=self.filepath:
222 self.blastIndexPath = testpath
223 def get_blast_index_path(self):
224 'get path to base name for BLAST index files'
225 try:
226 return self.blastIndexPath
227 except AttributeError:
228 return self.filepath
229 # DEFAULT: BUILD INDEX FILES IN self.filepath . HOME OR APPROPRIATE
230 # USER-/SYSTEM-SPECIFIC TEMPORARY DIRECTORY
231 blastIndexDirs = ['FILEPATH',os.getcwd,os.path.expanduser,
232 tempfile.gettempdir()]
233 def blast_index_paths(self):
234 'iterate over possible blast index directories'
235 try: # 1ST TRY ACTUAL SAVED LOCATION IF ANY
236 yield self.blastIndexPath
237 except AttributeError:
238 pass
239 for m in self.blastIndexDirs: # NOW TRY STANDARD LOCATIONS
240 if m=='FILEPATH':
241 yield self.filepath
242 continue
243 elif m == os.path.expanduser:
244 s = m('~') # GET HOME DIRECTORY
245 elif callable(m):
246 s = m()
247 else: # TREAT AS STRING
248 s = str(m)
249 yield os.path.join(s,os.path.basename(self.filepath))
250 def formatdb(self, filepath=None):
251 'try to build BLAST index files in an appropriate location'
252 if filepath is not None: # JUST USE THE SPECIFIED PATH
253 return self.run_formatdb(filepath)
254 notFirst = False
255 for testpath in self.blast_index_paths():
256 if notFirst:
257 logger.info('Trying next entry in self.blastIndexDirs...')
258 notFirst = True
259 try: # BUILD IN TARGET DIRECTORY
260 return self.run_formatdb(testpath)
261 except (IOError,OSError): # BUILD FAILED
262 pass
263 raise IOError, "cannot build BLAST database for %s" % (self.filepath,)
265 def raw_fasta_stream(self, ifile=None, idFilter=None):
266 '''Return a stream of fasta-formatted sequences.
268 Optionally, apply an ID filter function if supplied.
270 if ifile is not None: # JUST USE THE STREAM WE ALREADY HAVE OPEN
271 return ifile,idFilter
272 try: # DEFAULT: JUST READ THE FASTA FILE, IF IT EXISTS
273 return file(self.filepath, 'rU'),idFilter
274 except IOError: # TRY READING FROM FORMATTED BLAST DATABASE
275 cmd='fastacmd -D -d "%s"' % self.get_blast_index_path()
276 return os.popen(cmd),NCBI_ID_PARSER #BLAST ADDS lcl| TO id
279 _blast_prog_dict = dict(blastx='#BlastxMapping')
281 def blast_program(self, seq, blastprog=None):
282 'figure out appropriate blast program & remap via _blast_prog_dict'
283 if blastprog is None:
284 blastprog = blast_program(seq.seqtype(), self.seqDB._seqtype)
285 oldprog = blastprog
286 try: # apply program transformation if provided
287 blastprog = self._blast_prog_dict[blastprog]
288 if blastprog.startswith('#'): # not permitted by this class!
289 raise ValueError('Use %s for %s' % (blastprog[1:],oldprog))
290 except KeyError:
291 pass # no program transformation to apply, so nothing to do...
292 return blastprog
293 def blast_command(self, blastpath, blastprog, expmax, maxseq, opts):
294 'generate command string for running blast with desired options'
295 filepath = self.get_blast_index_path()
296 warn_if_whitespace(filepath)
297 cmd = [blastpath, '-d', filepath, '-p', blastprog,
298 '-e', '%e' % float(expmax)] + list(opts)
299 if maxseq is not None: # ONLY TAKE TOP maxseq HITS
300 cmd += ['-b', '%d' % maxseq, '-v', '%d' % maxseq]
301 return cmd
302 def get_seq_from_queryDB(self, queryDB):
303 'get one sequence obj from queryDB'
304 seqID = iter(queryDB).next() # get 1st seq ID
305 return queryDB[seqID]
307 def translation_kwargs(self, blastprog):
308 'return kwargs for read_blast_alignment() based on blastprog'
309 d = dict(tblastn=dict(translateDest=True),
310 blastx=dict(translateSrc=True),
311 tblastx=dict(translateSrc=True, translateDest=True))
312 try:
313 return d[blastprog]
314 except KeyError:
315 return {}
316 def __call__(self, seq=None, al=None, blastpath='blastall',
317 blastprog=None, expmax=0.001, maxseq=None, verbose=None,
318 opts=(), queryDB=None, **kwargs):
319 "Run blast search for seq in database, return alignment object"
320 if seq is None and queryDB is None:
321 raise ValueError("we need a sequence or db to use as query!")
322 if seq and queryDB:
323 raise ValueError("both a sequence AND a db provided for query")
324 if queryDB is not None:
325 seq = self.get_seq_from_queryDB(queryDB)
326 if not self.blastReady: # HAVE TO BUILD THE formatdb FILES...
327 self.formatdb()
328 blastprog = self.blast_program(seq, blastprog)
329 cmd = self.blast_command(blastpath, blastprog, expmax, maxseq, opts)
330 return process_blast(cmd, seq, self.idIndex, al, queryDB=queryDB,
331 ** self.translation_kwargs(blastprog))
333 class BlastxMapping(BlastMapping):
334 """Because blastx changes the query to multiple sequences
335 (representing its six possible frames), getitem can no longer
336 return a single slice object, but instead an iterator for one
337 or more slice objects representing the frames that had
338 homology hits."""
339 def __repr__(self):
340 return "<BlastxMapping '%s'>" % (self.filepath)
341 _blast_prog_dict = dict(blastn='tblastx', blastp='#BlastMapping',
342 tblastn='#BlastMapping')
343 def __getitem__(self, query):
344 """generate slices for all translations of the query """
345 # generate NLMSA for this single sequence
346 al = self(query)
347 # get the translation database for the sequence
348 tdb = translationDB.get_translation_db(query.db)
350 # run through all of the frames & find alignments.
351 slices = []
352 for trans_seq in tdb[query.id].iter_frames():
353 try:
354 slice = al[trans_seq]
355 except KeyError:
356 continue
358 if not isinstance(slice, EmptySlice):
359 slices.append(slice)
361 return slices
363 class MegablastMapping(BlastMapping):
364 def __repr__(self):
365 return "<MegablastMapping '%s'>" % (self.filepath)
366 def __call__(self, seq, al=None, blastpath='megablast', expmax=1e-20,
367 maxseq=None, minIdentity=None,
368 maskOpts=['-U', 'T', '-F', 'm'],
369 rmPath='RepeatMasker', rmOpts=['-xsmall'],
370 verbose=None, opts=(), **kwargs):
371 "Run megablast search with optional repeat masking."
372 if not self.blastReady: # HAVE TO BUILD THE formatdb FILES...
373 self.formatdb()
375 # mask repeats to lowercase
376 masked_seq = repeat_mask(seq,rmPath,rmOpts)
377 filepath = self.get_blast_index_path()
378 warn_if_whitespace(filepath)
379 cmd = [blastpath] + maskOpts \
380 + ['-d', filepath,
381 '-D', '2', '-e', '%e' % float(expmax)] + list(opts)
382 if maxseq is not None: # ONLY TAKE TOP maxseq HITS
383 cmd += ['-b', '%d' % maxseq, '-v', '%d' % maxseq]
384 if minIdentity is not None:
385 cmd += ['-p', '%f' % float(minIdentity)]
386 return process_blast(cmd, seq, self.idIndex, al, seqString=masked_seq,
387 popenArgs=dict(stdinFlag='-i'))
390 class BlastIDIndex(object):
391 """This class acts as a wrapper around a regular seqDB, and handles
392 the mangled IDs returned by BLAST to translate them to the correct ID.
393 Since NCBI treats FASTA ID as a blob into which they like to stuff
394 many fields... and then NCBI BLAST mangles those IDs when it reports
395 hits, so they no longer match the true ID... we are forced into
396 contortions to rescue the true ID from mangled IDs.
398 Our workaround strategy: since NCBI packs the FASTA ID with multiple
399 IDs (GI, GB, RefSeq ID etc.), we can use any of these identifiers
400 that are found in a mangled ID, by storing a mapping of these
401 sub-identifiers to the true FASTA ID."""
402 def __init__(self, seqDB):
403 self.seqDB = seqDB
404 self.seqInfoDict = BlastIDInfoDict(self)
405 # FOR UNPACKING NCBI IDENTIFIERS AS WORKAROUND FOR BLAST ID CRAZINESS
406 id_delimiter='|'
407 def unpack_id(self,id):
408 """Return |-packed NCBI identifiers as unpacked list.
410 NCBI packs identifier like gi|123456|gb|A12345|other|nonsense.
411 Return as list."""
412 return id.split(self.id_delimiter)
414 def index_unpacked_ids(self,unpack_f=None):
415 """Build an index of sub-IDs (unpacked from NCBI nasty habit
416 of using the FASTA ID as a blob); you can customize the unpacking
417 by overriding the unpack_id function or changing the id_delimiter.
418 The index maps each sub-ID to the real ID, so that when BLAST
419 hands back a mangled, fragmentary ID, we can unpack that mangled ID
420 and look up the true ID in this index. Any sub-ID that is found
421 to map to more than one true ID will be mapped to None (so that
422 random NCBI garbage like gnl or NCBI_MITO wont be treated as
423 sub-IDs).
425 if unpack_f is None:
426 unpack_f=self.unpack_id
427 t={}
428 for id in self.seqDB:
429 for s in unpack_f(id):
430 if s==id: continue # DON'T STORE TRIVIAL MAPPINGS!!
431 s=s.upper() # NCBI FORCES ID TO UPPERCASE?!?!
432 try:
433 if t[s]!=id and t[s] is not None:
434 t[s]=None # s NOT UNIQUE, CAN'T BE AN IDENTIFIER!!
435 except KeyError:
436 t[s]=id # s UNIQUE, TRY USING s AS AN IDENTIFIER
437 for id in t.itervalues():
438 if id is not None: # OK THERE ARE REAL MAPPINGS STORED, SO USE THIS
439 self._unpacked_dict=t # SAVE THE MAPPING TO REAL IDENTIFIERS
440 return
441 # NO NON-TRIVIAL MAPPINGS, SO JUST SAVE EMPTY MAPPING
442 self._unpacked_dict={}
444 def get_real_id(self,bogusID,unpack_f=None):
445 "try to translate an id that NCBI has mangled to the real sequence id"
446 if unpack_f is None:
447 unpack_f=self.unpack_id
448 if not hasattr(self,'_unpacked_dict'):
449 self.index_unpacked_ids(unpack_f)
450 for s in unpack_f(bogusID):
451 s=s.upper() # NCBI FORCES ID TO UPPERCASE?!?!
452 try:
453 id=self._unpacked_dict[s]
454 if id is not None:
455 return id # OK, FOUND A MAPPING TO REAL ID
456 except KeyError:
457 pass # KEEP TRYING...
458 # FOUND NO MAPPING, SO RAISE EXCEPTION
459 raise KeyError, "no key '%s' in database %s" % (bogusID,
460 repr(self.seqDB))
462 def __getitem__(self, seqID):
463 "If seqID is mangled by BLAST, use our index to get correct ID"
464 try: # default: treat as a correct ID
465 return self.seqDB[seqID]
466 except KeyError: # translate to the correct ID
467 return self.seqDB[self.get_real_id(seqID)]
469 def __contains__(self, seqID):
470 try:
471 self.seqInfoDict[seqID]
472 return True
473 except KeyError:
474 return False
476 class BlastIDInfoDict(object, UserDict.DictMixin):
477 """provide seqInfoDict interface for BlastIDIndex """
478 def __init__(self, db):
479 self.blastDB = db
481 def __getitem__(self, seqID):
482 try:
483 return self.blastDB.seqDB.seqInfoDict[seqID]
484 except KeyError:
485 seqID = self.blastDB.get_real_id(seqID)
486 return self.blastDB.seqDB.seqInfoDict[seqID]
488 def __len__(self):
489 return len(self.blastDB.seqDB.seqInfoDict)
491 def __iter__(self):
492 return iter(self.blastDB.seqDB.seqInfoDict)
494 def keys(self):
495 return self.blastDB.seqDB.seqInfoDict.keys()