modified: spffq.py
[GalaxyCodeBases.git] / python / mixture / pileup_parser_classes.py
blob490fdd761f0263834d685bf819bb15f24241ba16
1 #!/usr/bin/env python
3 import re
4 import sys
5 from phred import PhredHelper
7 class Pile:
8 def __init__(self, bases, scores):
9 self.bases = bases
10 self.scores = scores
12 # Operates on a Pile.bases string
13 class PileSanitizer:
14 def __init__(self):
15 self.indel_re = re.compile('[\+\-]')
16 self.cigar1_re = re.compile('\^')
17 self.cigar2_re = re.compile('\$')
19 def sanitize(self, bases):
20 clean_bases = ''
21 skip = 0
22 on_indel = False
23 insert_qual_score = False
24 for base in bases:
25 if on_indel:
26 skip = int(base)
27 on_indel = False
28 continue
29 if skip > 0:
30 skip -= 1
31 continue
32 m1 = self.indel_re.match(base)
33 if m1:
34 on_indel = True
35 continue
36 m2 = self.cigar1_re.match(base)
37 if m2:
38 skip = 1
39 continue
40 m3 = self.cigar2_re.match(base)
41 if m3:
42 continue
43 clean_bases += base
44 return clean_bases
46 # Operates on a Pile object
47 class QualityFilter:
48 def __init__(self, min_qual, offset=33):
49 self.quality_threshold = min_qual
50 self.phred_helper = PhredHelper(offset)
52 def filter(self, pile):
53 keep_indices = []
54 keep_bases = ''
55 keep_scores = ''
56 # build list of indices of bases/scores to keep
57 for index, score in enumerate(pile.scores):
58 int_score = self.phred_helper.char_to_int(score)
59 if int_score >= self.quality_threshold:
60 keep_indices.append(index)
61 #else:
62 #message = "discarding base " + pile.bases[index]
63 #message += " (index " + str(index) + ")"
64 #message += " from pile " + pile.bases + "; qual score = "
65 #message += str(int_score)+"\n"
66 #sys.stderr.write(message)
67 # build new bases and scores strings
68 #print("bases: "+pile.bases)
69 #print("scores: "+pile.scores)
70 #print("keepindices: "+str(keep_indices))
71 for n in keep_indices:
72 keep_bases += pile.bases[n]
73 keep_scores += pile.scores[n]
74 # update pile fields
75 pile.bases = keep_bases
76 pile.scores = keep_scores
78 # Operates on a list of strings (each string is a Pile.bases)
79 class ConsensusCaller:
80 def __init__(self, frequency):
81 self.min_base_frequency = frequency
83 # calls consensus on list of pile.bases strings
84 def call(self, list_of_bases):
85 total_length = 0
86 counts = {'A': 0, 'C': 0, 'T': 0, 'G': 0}
87 for pile in list_of_bases:
88 total_length += len(pile)
89 for base in pile:
90 if base == 'A' or base == 'a':
91 counts['A'] += 1
92 elif base == 'C' or base == 'c':
93 counts['C'] += 1
94 elif base == 'T' or base == 't':
95 counts['T'] += 1
96 elif base == 'G' or base == 'g':
97 counts['G'] += 1
98 ref_base = max(counts.iterkeys(), key=(lambda key: counts[key]))
99 if (float(counts[ref_base]) / total_length) >= self.min_base_frequency:
100 return ref_base
101 else:
102 return None
104 # Takes a list of two lists (0-indexed sample numbers for control and experimental groups)
105 class PileupLineParser:
106 def __init__(self, grouplist):
107 if len(grouplist) != 2:
108 sys.stderr.write("PileupLineParser: grouplist must be a list of 2 lists.")
109 self.control_group = grouplist[0]
110 self.experimental_group = grouplist[1]
112 def get_piles_from_group(self, group, line):
113 piles = []
114 for i in group:
115 starting_index = 3 * (i+1)
116 bases = line[starting_index + 1]
117 scores = line[starting_index + 2]
118 piles.append(Pile(bases, scores))
119 return piles
121 def get_control_piles(self, line):
122 return self.get_piles_from_group(self.control_group, line)
124 def get_experimental_piles(self, line):
125 return self.get_piles_from_group(self.experimental_group, line)
127 def get_all_bases(self, line):
128 all_bases = []
129 number_of_samples = (len(line) - 3) / 3
130 for i in range(number_of_samples):
131 starting_index = 3 * (i+1)
132 bases = line[starting_index + 1]
133 all_bases.append(bases)
134 return all_bases
136 def get_lengths(self, line):
137 lengths = []
138 number_of_samples = (len(line) - 3) / 3
139 for i in range(number_of_samples):
140 length_index = 3 * (i+1)
141 lengths.append(int(line[length_index]))
142 return lengths
144 def validate(self, line, min_length):
145 lengths = self.get_lengths(line)
146 for length in lengths:
147 if length < min_length:
148 return False
149 return True
151 def get_chromosome(self, line):
152 return line[0]
154 def get_coordinate(self, line):
155 return line[1]
157 def generate_locus(self, line):
158 chrom = self.get_chromosome(line)
159 coord = self.get_coordinate(line)
160 ctrl = self.get_control_piles(line)
161 exp = self.get_experimental_piles(line)
162 return Locus(chrom, coord, ctrl, exp)
164 class Locus:
165 def __init__(self, chrom, coord, ctrl_piles, exp_piles):
166 self.chromosome = chrom
167 self.coordinate = coord
168 self.control_piles = ctrl_piles
169 self.experimental_piles = exp_piles
171 def to_string(self):
172 result = "chromosome: " + self.chromosome + "; "
173 result += "coordinate: " + self.coordinate + "; "
174 result += "control piles: " + str(len(self.control_piles)) + "; "
175 result += "experimental piles: " + str(len(self.experimental_piles))
176 return result
178 def sanitize_all(self):
179 sani = PileSanitizer()
180 for pile in self.control_piles:
181 pile.bases = sani.sanitize(pile.bases)
182 for pile in self.experimental_piles:
183 pile.bases = sani.sanitize(pile.bases)
185 def filter_all(self, qscore, offset):
186 filt= QualityFilter(qscore, offset)
187 for pile in self.control_piles:
188 filt.filter(pile)
189 for pile in self.experimental_piles:
190 filt.filter(pile)
192 def validate_depth(self, depth):
193 for pile in self.control_piles:
194 if len(pile.scores) < depth:
195 return False
196 for pile in self.experimental_piles:
197 if len(pile.scores) < depth:
198 return False
199 return True
201 def call_consensus(self, frequency):
202 # only looks at control piles
203 caller = ConsensusCaller(frequency)
204 ctrl_bases = []
205 for pile in self.control_piles:
206 ctrl_bases.append(pile.bases)
207 base = caller.call(ctrl_bases)
208 return base
210 def generate_stats(self, called):
211 ref = str(called).lower()
212 ctrl_all_stats = []
213 exp_all_stats = []
214 ctrl_match = 0
215 ctrl_total = 0
216 exp_match = 0
217 exp_total = 0
218 match = 0
219 total = 0
220 for pile in self.control_piles:
221 ctrl_total += len(pile.bases)
222 for base in pile.bases:
223 total += 1
224 if base.lower() == ref:
225 match += 1
226 ctrl_match += 1
227 ctrl_all_stats.extend([match, total])
228 match = 0
229 total = 0
230 for pile in self.experimental_piles:
231 exp_total += len(pile.bases)
232 for base in pile.bases:
233 total += 1
234 if base.lower() == ref:
235 match += 1
236 exp_match += 1
237 exp_all_stats.extend([match, total])
238 match = 0
239 total = 0
240 result = [self.chromosome+"_locus"+self.coordinate]
241 result.extend([ctrl_match, ctrl_total, float(ctrl_match)/ctrl_total, '|'])
242 result.extend([exp_match, exp_total, float(exp_match)/exp_total, '|'])
243 result.extend(ctrl_all_stats)
244 result.append('|')
245 result.extend(exp_all_stats)
246 return result