Merge branch 'slowbisect'
[greylag.git] / greylag_rally.py
blob8af2b147b1744ec97f13440cfda3775574b2ac70
1 #!/usr/bin/env python
3 '''This program coordinates the search of mass spectra by sending subsearches
4 to running instances of greylag-chase, which are expected to be listening on
5 the specified hosts/ports.
7 <job-id> is a unique identifier used as a prefix for work files. The
8 <configuration-file> contains program options. The spectra are in the
9 <ms2-file>s.
11 '''
13 from __future__ import with_statement
15 __copyright__ = '''
16 greylag, a collection of programs for MS/MS protein analysis
17 Copyright (C) 2006-2008 Stowers Institute for Medical Research
19 This program is free software: you can redistribute it and/or modify
20 it under the terms of the GNU General Public License as published by
21 the Free Software Foundation, either version 3 of the License, or
22 (at your option) any later version.
24 This program is distributed in the hope that it will be useful,
25 but WITHOUT ANY WARRANTY; without even the implied warranty of
26 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 GNU General Public License for more details.
29 You should have received a copy of the GNU General Public License
30 along with this program. If not, see <http://www.gnu.org/licenses/>.
32 Contact: Mike Coleman
33 Stowers Institute for Medical Research
34 1000 East 50th Street
35 Kansas City, Missouri 64110
36 USA
37 '''
40 import asynchat
41 import asyncore
42 import ConfigParser
43 import cPickle as pickle
44 import errno
45 import exceptions
46 import heapq
47 import itertools
48 import logging; from logging import debug, info, warning
49 import optparse
50 import os
51 from pprint import pprint, pformat
52 import re
53 import socket
54 import sys
55 import tempfile
56 import time
58 from greylag import *
61 # gc possibly harms performance here, so disable it. gc only matters for
62 # cycles, which we (hope we) don't create. See the gc module docs.
63 import gc; gc.disable()
66 def error(s, *args):
67 "fatal error"
68 # if we're unit testing, just throw an exception
69 if __name__ != "__main__":
70 raise Exception((s + " (fatal error)") % args)
71 logging.error(s, *args)
72 sys.exit(1)
74 # fatal for rally, but chase just disconnects
75 chase_error = error
77 # name -> value map of processed greylag config parameters
78 GLP = {}
81 # FIX: This parsing is way too complex. How to simplify?
83 def mass_regime_part(part_specification):
84 """Parse a single mass regime specification part.
86 >>> mass_regime_part('MONO')
87 ('MONO', [])
88 >>> mass_regime_part('AVG')
89 ('AVG', [])
90 >>> mass_regime_part('MONO(N15@87.5%)')
91 ('MONO', [('N15', 0.875)])
93 """
94 ps = [ x.strip() for x in part_specification.partition('(') ]
95 if ps[0] not in ('MONO', 'AVG'):
96 raise ValueError("invalid mass regime list specification"
97 " (regime id must be 'MONO' or 'AVG')")
98 if not ps[1]:
99 return (ps[0], [])
100 if ps[2][-1] != ')':
101 raise ValueError("invalid mass regime list specification"
102 " (expected ')')")
103 pps = [ x.strip() for x in ps[2][:-1].split(',') ]
104 if len(pps) > 1:
105 raise ValueError("invalid mass regime list specification"
106 " (multiple isotopes not yet implemented)")
107 ppps = [ x.strip() for x in pps[0].partition('@') ]
108 if not ppps[1]:
109 raise ValueError("invalid mass regime list specification"
110 " (expected '@')")
111 if ppps[0] not in ('N15',):
112 raise ValueError("invalid mass regime list specification"
113 " (isotope id must currently be 'N15')")
114 if ppps[2][-1] != '%':
115 raise ValueError("invalid mass regime list specification"
116 " (expected '%')")
117 prevalence = float(ppps[2][:-1]) / 100
118 if not (0 <= prevalence <= 1):
119 raise ValueError("invalid mass regime list specification"
120 " (prevalence must be in range 0-100%)")
121 return (ps[0], [(ppps[0], prevalence)])
124 def mass_regime_list(mass_regime_list_specification):
125 """Check and return a list of regime tuples (parent_regime,
126 fragment_regime), where each regime is a tuple (id, [(isotope_id,
127 prevalence), ...]). Multiple isotopes (when implemented) would be
128 comma-separated.
130 >>> pprint(mass_regime_list('AVG/MONO;MONO;MONO(N15@75%)'))
131 [[('AVG', []), ('MONO', [])],
132 [('MONO', []), ('MONO', [])],
133 [('MONO', [('N15', 0.75)]), ('MONO', [('N15', 0.75)])]]
136 result = []
137 for regspec in mass_regime_list_specification.split(';'):
138 halves = [ x.strip() for x in regspec.split('/') ]
139 if len(halves) > 2:
140 raise ValueError("invalid mass regime list specification"
141 " (too many '/'?)")
142 pr = [ mass_regime_part(h) for h in halves ]
143 if len(pr) == 1:
144 pr = [ pr[0], pr[0] ]
145 result.append(pr)
147 # The first fragmentation regime should generally be MONO, so that
148 # formulaic deltas with '!' do the expected thing.
149 if result[0][1] != ('MONO', []):
150 raise ValueError("first fragmentation regime was something other than"
151 " 'MONO' with no isotopes--this is almost certainly"
152 " not what was intended")
153 return result
156 def parse_mod_term(s, is_potential=False):
157 """Parse a modification term, returning a tuple (sign, mod, fixed_regime,
158 residues, description, marker). The marker character must be printable
159 and non-alphabetic.
161 >>> parse_mod_term('-C2H3ON!@C')
162 (-1, 'C2H3ON', True, 'C', None, None)
163 >>> parse_mod_term("42@STY phosphorylation '*'", is_potential=True)
164 (1, 42.0, False, 'STY', 'phosphorylation', '*')
168 m = re.match(r'^\s*(-|\+)?(([1-9][0-9.]*)|([A-Z][A-Z0-9]*))(!?)'
169 r'@([A-Z]+|\[|\])(\s+([A-Za-z0-9_]+))?'
170 r"(\s+'([[-`!-@{-~])')?\s*$", s)
171 if not m:
172 raise ValueError("invalid modification term specification"
173 " '%s'" % s)
174 mg = m.groups()
175 invalid_residues = set(mg[5]) - set(RESIDUES_W_BRACKETS)
176 if invalid_residues:
177 raise ValueError("invalid modification list specification"
178 " (invalid residues %s)" % list(invalid_residues))
179 delta = mg[1]
180 if mg[2]:
181 delta = float(mg[1])
182 if is_potential and abs(delta) < 0.0001:
183 raise ValueError("invalid modification list specification"
184 " (delta '%s' is too small)" % delta)
185 residues = mg[5]
186 if not is_potential and len(residues) != 1:
187 raise ValueError("invalid modification list specification '%s' (only"
188 " potential modifications may have multiple residues)"
189 % residues)
190 if len(residues) != len(set(residues)):
191 raise ValueError("invalid modification list specification"
192 " '%s' (duplicate residues prohibited)"
193 % residues)
194 return (1 if mg[0] != '-' else -1, delta, mg[4] == '!', residues, mg[7],
195 mg[9])
198 def fixed_mod_list(specification):
199 """Check and return a list of modification tuples.
201 >>> fixed_mod_list('57@C')
202 [(1, 57.0, False, 'C', None, None)]
203 >>> fixed_mod_list("57@C '*',CH!@N desc")
204 [(1, 57.0, False, 'C', None, '*'), (1, 'CH', True, 'N', 'desc', None)]
205 >>> fixed_mod_list('57@C,58@C')
206 Traceback (most recent call last):
208 ValueError: invalid modification list specification '['C', 'C']' (duplicate residues prohibited)
211 if not specification:
212 return []
213 result = [ parse_mod_term(s) for s in specification.split(',') ]
214 residues = [ x[3] for x in result ]
215 # this check is across terms; the one above is within terms
216 if len(residues) != len(set(residues)):
217 raise ValueError("invalid modification list specification"
218 " '%s' (duplicate residues prohibited)"
219 % residues)
220 debug("fixed_mod_list:\n%s", pformat(result))
221 return result
224 def parse_mod_basic_expression(s):
225 """Parse s looking for either a term or a parenthesized subexpression.
226 Return the pair (tree of modification tuples, unparsed suffix of s).
228 s = s.strip()
229 if s[0] == '(':
230 tree, rest = parse_mod_disjunction(s[1:])
231 rest = rest.lstrip()
232 if rest[:1] != ')':
233 raise ValueError("invalid modification list specification"
234 " (expected matching ')')")
235 return tree, rest[1:]
236 parts = re.split(r'([;,()])', s, 1)
237 if len(parts) == 1:
238 term, rest = s, ''
239 else:
240 assert len(parts) == 3
241 term, rest = parts[0], parts[1]+parts[2]
242 return parse_mod_term(term, is_potential=True), rest
244 def parse_mod_conjunction(s):
245 """Parse s looking for a ','-separated sequence of subexpressions. Return
246 the pair (tree of modification tuples, unparsed suffix of s).
248 result = []
249 while s:
250 tree, s = parse_mod_basic_expression(s)
251 result.append(tree)
252 s = s.lstrip()
253 if s[:1] != ',':
254 break
255 s = s[1:]
256 return result, s
258 def parse_mod_disjunction(s):
259 """Parse s looking for a ';'-separated sequence of subexpressions. Return
260 the pair (tree of modification tuples, unparsed suffix of s).
263 result = []
264 while s:
265 tree, s = parse_mod_conjunction(s)
266 result.append(tree)
267 s = s.lstrip()
268 if s[:1] != ';':
269 break
270 s = s[1:]
271 return result, s
273 def potential_mod_list(specification):
274 """Check and return a tree of potential modification tuples. Nodes at
275 even (odd) levels are disjunctions (conjunctions). (The top list is a
276 disjunction.)
278 >>> potential_mod_list('')
280 >>> pprint(potential_mod_list('PO3H@STY; C2H2O@KST'))
281 [[(1, 'PO3H', False, 'STY', None, None)],
282 [(1, 'C2H2O', False, 'KST', None, None)]]
283 >>> pprint(potential_mod_list('PO3H@STY, C2H2O@KST'))
284 [[(1, 'PO3H', False, 'STY', None, None),
285 (1, 'C2H2O', False, 'KST', None, None)]]
286 >>> pprint(potential_mod_list('''(PO3H@STY phosphorylation '*';
287 ... C2H2O@KST acetylation '^';
288 ... CH2@AKST methylation '#'),
289 ... O@M oxidation '@'
290 ... '''))
291 [[[[(1, 'PO3H', False, 'STY', 'phosphorylation', '*')],
292 [(1, 'C2H2O', False, 'KST', 'acetylation', '^')],
293 [(1, 'CH2', False, 'AKST', 'methylation', '#')]],
294 (1, 'O', False, 'M', 'oxidation', '@')]]
297 if not specification:
298 return []
299 tree, remainder = parse_mod_disjunction(specification)
300 if remainder:
301 raise ValueError("invalid modification list specification"
302 " (unexpected '%s')" % remainder)
303 debug("potential_mod_list:\n%s", pformat(tree))
304 return tree
307 def p_positive(x): return x > 0
308 def p_negative(x): return x < 0
309 def p_nonnegative(x): return x >= 0
310 def p_proportion(x): return 0 <= x <= 1
312 # Configuration file parameter specification
314 # name -> (type, default, check_fn)
315 # type may be bool, int, float, str, modlist, or a tuple of values
316 # default, as a string value, or None if value must be explicitly specified
317 # check_fn is an optional function that returns True iff the value is valid
319 PARAMETER_INFO = {
320 "databases" : (str, None),
321 "mass_regimes" : (mass_regime_list, "MONO"),
322 "mass_regime_debug_delta" : (float, 0),
323 "pervasive_mods" : (fixed_mod_list, ""),
324 "potential_mods" : (potential_mod_list, ""),
325 "potential_mod_limit" : (int, 2, p_nonnegative),
326 "enable_pca_mods" : (bool, "yes"),
327 "charge_limit" : (int, 3, p_positive),
328 "min_peptide_length" : (int, 5, p_positive),
329 "cleavage_motif" : (str, "[X]|[X]"),
330 "maximum_missed_cleavage_sites" : (int, 1, p_nonnegative),
331 "min_parent_spectrum_mass" : (float, 0, p_nonnegative),
332 "max_parent_spectrum_mass" : (float, 10000, p_nonnegative),
333 "TIC_cutoff_proportion" : (float, 0.98, p_proportion),
334 "parent_mz_tolerance" : (float, 1.25, p_nonnegative),
335 "fragment_mass_tolerance" : (float, 0.5, p_nonnegative),
336 "intensity_class_count" : (int, 3, p_positive),
337 "intensity_class_ratio" : (float, 2.0, p_positive), # really > 1.0?
338 "best_result_count" : (int, 5, p_positive),
342 def validate_parameters(parameters, parameter_info=PARAMETER_INFO):
343 """Verify that parameters are valid, have valid values, and correspond to
344 currently implemented functionality. Values are converted, default values
345 are filled in, and the resulting name/value dict returned.
347 >>> sorted(validate_parameters({'spectrum, total peaks' : '40'},
348 ... {'spectrum, total peaks'
349 ... : (int, '50', p_positive),
350 ... 'output, spectra'
351 ... : (bool, 'no')}).items())
352 [('output, spectra', False), ('spectrum, total peaks', 40)]
356 pmap = {}
357 for p_name, p_info in sorted(parameter_info.items()):
358 type_ = p_info[0]
359 default = p_info[1]
360 check_fn = p_info[2] if len(p_info) > 2 else None
362 v = parameters.get(p_name)
363 if v == None:
364 if default != None:
365 debug("parameter '%s' defaulting to '%s'", p_name, default)
366 v = default
367 else:
368 error("missing required parameter '%s'" % p_name)
369 if isinstance(type_, tuple):
370 if not v in type_:
371 error("parameter '%s' value '%s' not in %s (feature not"
372 " implemented?)" % (p_name, v, type_))
373 elif type_ == bool:
374 v = { 'yes' : True, 'no' : False }.get(v)
375 if v == None:
376 error("parameter '%s' requires a value of 'yes' or 'no'")
377 else:
378 try:
379 v = type_(v)
380 except ValueError, e:
381 error("parameter '%s' has value '%s' with invalid format [%s]"
382 % (p_name, v, e))
383 if check_fn and not check_fn(v):
384 error("parameter '%s' has invalid value '%s' (or feature not"
385 " implemented)" % (p_name, v))
386 pmap[p_name] = v
388 unknown_parameters = set(parameters) - set(parameter_info)
389 if unknown_parameters:
390 warning("%s unknown parameter(s):\n %s", len(unknown_parameters),
391 pformat(sorted(list(unknown_parameters))))
392 return pmap
395 def file_contents(filename):
396 """Return the contents of the file."""
397 with open(filename) as f:
398 return f.read()
401 # about 100/s, probably fast enough (just transmitting takes longer?)
402 def generate_spectra_from_ms2_files(spectrum_fns):
403 """Read ms2 files, yielding tuples (file_index, physical_index, index,
404 name, mass, charge, peaks). The indexes are 0-based. Mass and charge are
405 returned as strings, and peaks is the newline-embedded peak block from the
406 file.
408 header_re = re.compile(r'^:', re.MULTILINE)
410 # check open-ability of all files immediately, to avoid surprises later
411 for fn in spectrum_fns:
412 open(fn).close()
414 physical_index = 0
415 index = 0
416 pending_headers = []
417 for file_index, spfn in enumerate(spectrum_fns):
418 contents = file_contents(spfn)
419 for block in header_re.split(contents)[1:]:
420 b = block.split('\n', 2)
421 if len(b) != 3:
422 error("malformed ms2 file '%s' (malformed header)" % spfn)
423 name, h_line1, peaks = b
424 h = h_line1.split(' ', 1)
425 if len(h) != 2:
426 error("malformed ms2 file '%s' (malformed mass/charge header)"
427 % spfn)
428 mass, charge = h
429 pending_headers.append((name, mass, charge))
430 if peaks:
431 for name, mass, charge in pending_headers:
432 yield (file_index, physical_index, index,
433 name, mass, charge, peaks)
434 index += 1
435 pending_headers = []
436 physical_index += 1
437 if pending_headers:
438 error("malformed ms2 file '%s' (headers at end?)" % spfn)
441 def count_ms2_spectra(spectrum_fns):
442 """Return count of ms2 spectra, for ETA calculation, etc."""
443 return sum(file_contents(fn).count(':') for fn in spectrum_fns)
446 # This uses too much RAM...
447 # def check_ms2_files(spectrum_fns):
448 # """Verify that the spectrum files are well-formed, calling error if not."""
450 # ms2_re = re.compile(r'''
451 # ^(
452 # (:.*\n
453 # [1-9]\d*\.\d+\ [1-9]\d*\r?\n)+
454 # (\d+\.\d+\ \d+(\.\d+)?\r?\n)+
455 # )+$
456 # ''', re.VERBOSE)
458 # for file_index, spfn in enumerate(spectrum_fns):
459 # with open(spfn) as specfile:
460 # contents = specfile.read()
461 # info("SKIP checking '%s' (%s bytes)", spfn, len(contents))
462 # #if not ms2_re.match(contents):
463 # # error("invalid ms2 file '%s'", spfn)
466 # FIX: optionally print peak statistics? (slow?)
467 # def peak_statistics(spectra):
468 # counts = [ len(sp.peaks) for sp in spectra ]
469 # counts.sort()
470 # n = len(counts)
471 # return (counts[0], counts[int(n*0.25)], counts[int(n*0.5)],
472 # counts[int(n*0.75)], counts[-1], sum(counts) / float(n))
474 # def print_spectrum_statistics(spectra):
475 # info(" %s spectra (mass range %s - %s)", len(spectra),
476 # spectra[0].mass, spectra[-1].mass)
477 # info(" peak stats: %s..%s..%s..%s..%s (mean=%.2f)"
478 # % peak_statistics(spectra))
481 class chase_client(asynchat.async_chat):
483 # heapq of (deathtime, host, port) for connections that have died or
484 # timed-out. deathtime is the time of death. It's up to external code to
485 # recreate after a suitable wait, if desired.
486 dead_clients = []
488 # cached so that we don't have thousands of identical copies
489 pickled_parameters = None
490 pickled_sequences = None
492 spectrum_generator = None
494 # (host, port) -> (submittime, [ spectrum, ... ])
495 # tracks the spectra each chase_client is currently searching
496 spectra_in_progress = {}
498 # heapq of (lastsubmittime, spectrum) for spectra submitted (at least
499 # once) and (possibly) not yet completed. lastsubmittime is provided so
500 # that we can submit the least recently submitted spectrum next.
501 # Not populated until spectrum_generator has been exhausted (to avoid
502 # keeping lots of spectra in memory).
503 spectrum_queue = None
505 # set of spectrum_no's of spectra already searched (only those searched
506 # since we populated spectrum_queue)
507 searched_spectra = set()
509 # number of spectra for which results not yet received
510 spectra_to_go = None
512 # The number of spectra submitted to clients for a search command. This
513 # can be adjusted upwards for efficiency.
514 spectrum_batch_size = 1
516 # file on which to write pickled results
517 result_file = None
518 # index of result_file
519 # [ (spectrum_id, offset, length), ... ]
520 result_file_index = []
522 @classmethod
523 def set_parameters(self, parameters):
524 assert self.pickled_parameters == None, "no change once set"
525 self.pickled_parameters = pickle.dumps(parameters)
527 @classmethod
528 def set_sequences(self, sequences):
529 assert self.pickled_sequences == None, "no change once set"
530 self.pickled_sequences = pickle.dumps(sequences)
532 @classmethod
533 def set_result_file(self, result_file):
534 assert self.result_file == None, "no change once set"
535 self.result_file = result_file
537 @classmethod
538 def set_spectra(self, spectrum_generator, spectra_to_go):
539 """Fix the list of spectra to be searched, which must be done before
540 any clients are created (and cannot be changed later).
542 assert self.spectrum_generator == None, "no change once set"
543 self.spectrum_generator = spectrum_generator
544 self.spectra_to_go = spectra_to_go
546 @classmethod
547 def _remember_result(self, spectrum_id, result):
548 offset = self.result_file.tell()
549 pickle.dump(result, self.result_file, pickle.HIGHEST_PROTOCOL)
550 length = self.result_file.tell() - offset
551 self.result_file_index.append((spectrum_id, offset, length))
552 self.searched_spectra.add(spectrum_id)
553 self.spectra_to_go -= 1
556 def __init__(self, host, port):
557 asynchat.async_chat.__init__(self)
558 assert self.pickled_parameters and self.pickled_sequences
559 assert self.spectrum_generator
560 self.host = host
561 self.port = port
563 self.ibuffer = []
564 self.set_terminator('\n')
566 self.banner = None
568 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
569 self.set_keep_alive()
570 self.connect((host, port))
572 # override asynchat version to use a reasonable buffer size
573 def push(self, data):
574 self.producer_fifo.push(
575 asynchat.simple_producer(data, buffer_size=self.ac_out_buffer_size))
576 self.initiate_send()
578 def __repr__(self):
579 return "<%s connected to %s:%s>" % (self.__class__.__name__,
580 self.host, self.port)
583 def set_keep_alive(self):
584 # try to better notice reboots, net failures, etc
585 try:
586 self.socket.setsockopt(
587 socket.SOL_SOCKET, socket.SO_KEEPALIVE,
588 self.socket.getsockopt(socket.SOL_SOCKET,
589 socket.SO_KEEPALIVE) | 1
591 except socket.error:
592 pass
595 def _send_command(self, command, pickled_argument):
596 self.push(command + ' ')
597 self.push("%s\n" % len(pickled_argument))
598 self.push(pickled_argument)
600 def _submit_search(self):
601 "send a search to client"
603 batch_size = self.spectrum_batch_size
605 # start slow, to get chasers going immediately, then increase batch
606 # size to increase master efficiency (and level load)
607 #batch_size = min(self.spectrum_batch_size,
608 # 4**min(self.submit_count-1,10))
610 #if self.spectrum_queue:
611 # # on each retry round, divide batch size by four
612 # batch_size = max(1, (batch_size
613 # / 2**min(64, 2*self.spectrum_queue[0][0])))
615 # get some never-searched spectra, if there are any
616 submit_spectra = list(itertools.islice(self.spectrum_generator,
617 batch_size))
618 if submit_spectra:
619 self.spectra_in_progress[(self.host, self.port)] = (time.time(),
620 submit_spectra)
621 else:
622 if self.spectrum_queue == None:
623 # move spectra_in_progress to spectrum_queue
624 self.__class__.spectrum_queue = []
625 for t, subsp in self.spectra_in_progress.values():
626 for sp in subsp:
627 heapq.heappush(self.spectrum_queue, (t, sp))
628 self.__class__.spectra_in_progress = None
630 # generate submit_spectra from spectrum_queue
631 while self.spectrum_queue and len(submit_spectra) < batch_size:
632 _submittime, spectrum = heapq.heappop(self.spectrum_queue)
633 # spectrum = (file_index, physical_index, index, name, mass,
634 # charge, peaks)
635 if spectrum[2] in self.searched_spectra:
636 continue # already have search result
637 heapq.heappush(self.spectrum_queue, (time.time(), spectrum))
638 submit_spectra.append(spectrum)
640 if submit_spectra:
641 debug("submitting %s spectra (%s not yet retired)",
642 len(submit_spectra), self.spectra_to_go)
643 self._send_command('spectra', pickle.dumps(submit_spectra))
644 self._send_command('search', pickle.dumps(None))
645 else:
646 self.close()
648 def _receive_response(self):
649 r = ''.join(self.ibuffer)
650 self.ibuffer = []
651 return r
654 def handle_connect(self):
655 debug("connecting to %s:%s", self.host, self.port)
657 def handle_expt(self):
658 pass
660 def handle_error(self):
661 #self.handle_close()
662 exc, why, _traceback = sys.exc_info()
663 self.connected = False
664 if exc == exceptions.KeyboardInterrupt: # FIX: works?
665 error("received keyboard interrupt")
666 if exc == socket.error:
667 if why[0] == errno.ECONNREFUSED:
668 debug("no chaser at %s:%s (connection refused)", self.host,
669 self.port)
670 else:
671 info("network error on connection %s:%s (%s %s)", self.host,
672 self.port, exc, why)
673 debug(" traceback: %s", asyncore.compact_traceback()[3])
674 else:
675 info("unexpected error on connection %s:%s (%s %s)", self.host,
676 self.port, exc, why)
677 info(" traceback: %s", asyncore.compact_traceback()[3])
680 def handle_close(self):
681 # FIX
682 #self.__class__.dead_clients.append((time.time(), self.host, self.port))
683 self.dead_clients.append((time.time(), self.host, self.port))
684 asynchat.async_chat.handle_close(self)
687 def collect_incoming_data(self, data):
688 self.ibuffer.append(data)
690 def found_terminator(self):
691 if self.banner == None:
692 self.banner = self._receive_response()
693 banner_words = self.banner.split()
694 if banner_words[0] != 'greylag':
695 warning("%s:%s is not a greylag-chase", self.host, self.port)
696 self.handle_close()
697 return
698 if banner_words[2:3] != ['ready']:
699 debug("greylag-chase %s:%s is serving %s", self.host,
700 self.port, banner_words[3:4])
701 self.handle_close()
702 return
703 # now we can start talking
704 self._send_command('parameters', self.pickled_parameters)
705 self._send_command('sequences', self.pickled_sequences)
706 self._submit_search()
707 return
709 if self.get_terminator() == '\n':
710 reply = self._receive_response()
711 reply_words = reply.split()
712 if reply_words[0] == 'error':
713 warning("%s:%s gave '%s'", self.host, self.port, reply)
714 self.handle_close()
715 return
717 assert reply_words[0] == 'found' and len(reply_words) == 2
718 self.set_terminator(int(reply_words[1]))
719 return
721 result = pickle.loads(self._receive_response())
723 #debug("received: %s" % result)
724 # FIX: check that multiply searched spectra had same results
725 for noise_sp_id in result['noise_spectrum_ids']:
726 if noise_sp_id not in self.searched_spectra:
727 self._remember_result(noise_sp_id, False)
729 for sp_id, sp_matches in result['matches'].iteritems():
730 if sp_id not in self.searched_spectra:
731 self._remember_result(sp_id, sp_matches)
733 self.set_terminator('\n')
734 self._submit_search()
737 def main(args=sys.argv[1:]):
738 parser = optparse.OptionParser(usage=
739 "usage: %prog [options]"
740 " <configuration-file> <ms2-file>...",
741 description=__doc__, version=VERSION)
742 pa = parser.add_option
743 DEFAULT_HOSTFILE = "/etc/greylag/hosts"
744 pa("--hostfile", dest="hostfile", default=DEFAULT_HOSTFILE,
745 help="file listing host:port locations where greylag-chase workers are"
746 " listening. [default '%s']" % DEFAULT_HOSTFILE)
747 pa("--job-id", dest="job_id", default="unknown",
748 help="used to generate unique output filenames [default 'unknown']")
749 #pa("--estimate", action="store_true", dest="estimate_only",
750 # help="just estimate the time required for the search")
751 pa("-v", "--verbose", action="store_true", dest="verbose",
752 help="be verbose")
753 pa("-q", "--quiet", action="store_true", dest="quiet", help="no warnings")
754 pa("-l", "--logfile", dest="logfile",
755 help="log to FILE instead of stderr", metavar="FILE")
756 pa("-P", "--parameter", dest="parameters", action="append",
757 default=[],
758 help="override a parameter in <configuration-file>, may be used"
759 " multiple times", metavar="NAME=VALUE")
760 pa("--copyright", action="store_true", dest="copyright",
761 help="print copyright and exit")
762 pa("--debug", action="store_true", dest="debug",
763 help="output debugging info")
764 pa("--profile", action="store_true", dest="profile",
765 help="dump Python profiling output to './greylag-rally.prof.<pid>'")
766 (options, args) = parser.parse_args(args=args)
768 if options.copyright:
769 print __copyright__
770 sys.exit(0)
772 if len(args) < 2:
773 parser.print_help()
774 sys.exit(1)
776 configuration_fn = args[0]
777 spectrum_fns = args[1:]
779 if (any(True for f in spectrum_fns if not f.endswith('.ms2'))
780 or any(True for p in options.parameters if '=' not in p)):
781 parser.print_help()
782 sys.exit(1)
784 set_logging(options)
786 info("starting on %s", socket.gethostname())
788 # check early that we can open result files
789 result_fn = 'chase_%s.glw' % options.job_id
790 result_file = open(result_fn, 'wb')
791 temp_result_file = tempfile.TemporaryFile(dir='.', prefix=result_fn+'-')
793 # read chaser (host, port) listener list
794 try:
795 with open(options.hostfile) as hostf:
796 hosts = [ l.split(':', 1) for l in hostf ]
797 hosts = [ (host, int(port)) for host, port in hosts ]
798 except ValueError:
799 error("invalid or empty host line in '%s'" % options.hostfile)
801 if not hosts:
802 error("no valid search hosts specified")
804 # check spectrum basename uniqueness, as corresponding sqt files will be
805 # in a single directory
806 base_spectrum_fns = [ os.path.basename(fn) for fn in spectrum_fns ]
807 if len(base_spectrum_fns) != len(set(base_spectrum_fns)):
808 error("base spectrum filenames must be unique")
810 options.parameters = [ p.split('=', 1) for p in options.parameters ]
812 # check -P names for validity
813 bad_names = (set(n for n,v in options.parameters)
814 - set(PARAMETER_INFO.keys()))
815 if bad_names:
816 error("bad -P parameter names %s" % list(bad_names))
818 # read params
819 cp = ConfigParser.RawConfigParser()
820 cp.optionxform = str # be case-sensitive
821 with open(configuration_fn) as configuration_file:
822 cp.readfp(configuration_file)
823 if not cp.has_section('greylag'):
824 error("%s has no [greylag] section" % configuration_fn)
825 parameters = dict(cp.items('greylag'))
826 parameters.update(dict(options.parameters)) # command-line override
827 global GLP
828 GLP = validate_parameters(parameters)
830 # test parse cleavage motif here, so we notice errors early
831 cleavage_motif_re(GLP["cleavage_motif"])
833 fixed_mod_map = dict((r[3], r) for r in GLP["pervasive_mods"])
834 # FIX: is this even necessary?
835 regime_manifest = initialize_spectrum_parameters(options, GLP,
836 GLP["mass_regimes"],
837 fixed_mod_map)
838 info("reading sequence databases")
839 # read sequence dbs
840 databases = GLP["databases"].split()
841 # [(locusname, defline, seq, filename), ...]
842 fasta_db = list(read_fasta_files(databases))
844 # This is what we'll send to chasers. Either (1) the already parsed
845 # contents of the databases, which is slow to send, or (2) a list of
846 # (database name, sha1 checksums), which requires that the files exist on
847 # the chaser hosts.
848 db_representation = ("value", fasta_db)
849 if 1: # FIX: make this an option?
850 db_representation = ("name",
851 [ (db, file_sha1(db)) for db in databases ])
853 # FIX: is this worth doing here?
854 #info("checking spectrum files")
855 #check_ms2_files(spectrum_fns)
856 #info("spectrum files okay")
858 spectrum_count = count_ms2_spectra(spectrum_fns)
860 if not spectrum_count:
861 error("no input spectra")
862 info("counted %s spectra", spectrum_count)
864 # FIX: gather these statistics from chasers? (!)
865 # before/after filtering...
866 # print_spectrum_statistics(spectra)
868 # now connect to chasers and loop until done (or timeout)
869 chase_client.set_parameters(GLP)
870 chase_client.set_sequences(db_representation)
871 chase_client.set_spectra(generate_spectra_from_ms2_files(spectrum_fns),
872 spectrum_count)
873 chase_client.set_result_file(temp_result_file)
875 # Trivial strategy: Hand each client 1/10th of their likely workload in
876 # each batch. If client crashes, we haven't lost too much work.
877 # Making this value larger keeps polling efficient and lets us handle a
878 # large number of clients. If it's too large, though, we're need more RAM
879 # for temporary storage of pythonized spectra.
880 chase_client.spectrum_batch_size = min(100, max(1, int(0.1 * spectrum_count
881 / len(hosts))))
883 # FIX: this is actually faster (?)
884 #chase_client.spectrum_batch_size = 1
886 # adjust for performance
887 chase_client.ac_in_buffer_size = 62000
888 chase_client.ac_out_buffer_size = 62000
889 #chase_client.ac_in_buffer_size = 620000
890 #chase_client.ac_out_buffer_size = 620000
892 for host, port in hosts:
893 chase_client(host, port)
895 # retry time for dead clients (in seconds)
896 retryafter = 60
898 start_time = time.time()
899 laststatus = 0
900 while True:
901 asyncore.loop(count=1, timeout=retryafter, use_poll=True)
902 if chase_client.spectrum_queue == []:
903 break
905 now = time.time()
907 if now - laststatus >= 10:
908 eta_minutes = ((now - start_time)
909 / (spectrum_count - chase_client.spectra_to_go
910 + 0.1)
911 * chase_client.spectra_to_go / 60.0)
912 info("%s spectra to search, %s chasers, ETA %dm",
913 chase_client.spectra_to_go, len(asyncore.socket_map),
914 int(eta_minutes))
915 laststatus = now
917 while (chase_client.dead_clients
918 and chase_client.dead_clients[0][0] < now - retryafter):
919 debug("died %s restart %s", chase_client.dead_clients[0][0], now)
920 dt, host, port = heapq.heappop(chase_client.dead_clients)
921 chase_client(host, port)
922 if not asyncore.socket_map and chase_client.dead_clients:
923 debug("sleeping")
924 time.sleep(max(0, retryafter - (now
925 - chase_client.dead_clients[0][0])))
926 # failsafe, better performance?
927 #time.sleep(0.1)
930 # FIX: could close all client sockets here--worth doing?
932 # FIX
933 #total_comparisons = sum(ss.candidate_spectrum_count
934 # for ss in score_statistics)
935 #if options.estimate_only:
936 # # divisor is just a slightly informed guess
937 # print ("%.2f generic CPU hours" % (total_comparisons / 439.0e6))
938 # return
940 info("writing result file '%s'", result_fn)
942 pk = pickle.Pickler(result_file, pickle.HIGHEST_PROTOCOL)
943 pk.fast = 1 # stipulate no circular references
944 mod_conjunct_triples = get_mod_conjunct_triples(GLP["potential_mods"],
945 GLP["potential_mod_limit"],
946 GLP["mass_regimes"])
947 pk.dump({ 'version' : VERSION,
948 'total comparisons' : 0,
949 'spectrum files' : base_spectrum_fns,
950 'databases' : databases,
951 'parameters' : GLP,
952 'mass regime atomic masses' : MASS_REGIME_ATOMIC_MASSES,
953 'mass regime manifest' : sorted(regime_manifest),
954 'proton mass' : PROTON_MASS,
955 'modification conjuncts' : mod_conjunct_triples,
956 'argv' : sys.argv })
958 temp_result_file.flush() # unneeded?
959 for sp_id, offset, length in sorted(chase_client.result_file_index):
960 temp_result_file.seek(offset)
961 r = temp_result_file.read(length)
962 assert len(r) == length
963 result_file.write(r)
965 # sp = (file_index, physical_index, index, name, mass, charge, peaks)
966 #result_matches = dict(((sp[0], sp[3]), chase_client.results[sp[2]])
967 # for sp in spectra if chase_client.results[sp[2]])
969 pk.dump('complete') # so reader can tell file not truncated
970 result_file.close() # force any file error to occur now
971 info("finished")
974 if __name__ == '__main__':
975 # try:
976 # import psyco
977 # psyco.full()
978 # ###psyco.bind(formula_mass)
979 # warning('psyco enabled')
980 # except ImportError:
981 # pass
983 try:
984 if '--profile' in sys.argv:
985 import cProfile
986 import pstats
987 report_fn = "greylag-rally.prof.%s" % os.getpid()
988 data_fn = report_fn + ".tmp"
989 prof = cProfile.run('main()', data_fn)
990 with open(report_fn, 'w') as report_f:
991 try:
992 stats = pstats.Stats(data_fn, stream=report_f)
993 stats.strip_dirs()
994 stats.sort_stats('cumulative')
995 stats.print_stats(50)
996 stats.sort_stats('time')
997 stats.print_stats(50)
998 print "# profile report written to '%s'" % report_fn
999 finally:
1000 try:
1001 os.remove(data_fn)
1002 except:
1003 pass
1004 else:
1005 main()
1006 except SystemExit:
1007 raise
1008 except:
1009 logging.exception("unhandled exception")
1010 sys.exit(1)
1011 finally:
1012 logging.shutdown()