Merge commit 'v0.1.2'
[greylag.git] / greylag_rally.py
blob25082858015b376844cccc30fa1f54327d7111fa
1 #!/usr/bin/env python
3 '''This program coordinates the search of mass spectra by sending subsearches
4 to running instances of greylag-chase, which are expected to be listening on
5 the specified hosts/ports.
7 <job-id> is a unique identifier used as a prefix for work files. The
8 <configuration-file> contains program options. The spectra are in the
9 <ms2-file>s.
11 '''
13 from __future__ import with_statement
15 __copyright__ = '''
16 greylag, a collection of programs for MS/MS protein analysis
17 Copyright (C) 2006-2008 Stowers Institute for Medical Research
19 This program is free software: you can redistribute it and/or modify
20 it under the terms of the GNU General Public License as published by
21 the Free Software Foundation, either version 3 of the License, or
22 (at your option) any later version.
24 This program is distributed in the hope that it will be useful,
25 but WITHOUT ANY WARRANTY; without even the implied warranty of
26 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 GNU General Public License for more details.
29 You should have received a copy of the GNU General Public License
30 along with this program. If not, see <http://www.gnu.org/licenses/>.
32 Contact: Mike Coleman
33 Stowers Institute for Medical Research
34 1000 East 50th Street
35 Kansas City, Missouri 64110
36 USA
37 '''
40 import asynchat
41 import asyncore
42 import ConfigParser
43 import cPickle as pickle
44 import errno
45 import exceptions
46 import heapq
47 import itertools
48 import logging; from logging import debug, info, warning
49 import optparse
50 import os
51 from pprint import pprint, pformat
52 import re
53 import socket
54 import sys
55 import tempfile
56 import time
58 from greylag import *
61 # gc possibly harms performance here, so disable it. gc only matters for
62 # cycles, which we (hope we) don't create. See the gc module docs.
63 import gc; gc.disable()
66 def error(s, *args):
67 "fatal error"
68 # if we're unit testing, just throw an exception
69 if __name__ != "__main__":
70 raise Exception((s + " (fatal error)") % args)
71 logging.error(s, *args)
72 sys.exit(1)
74 # fatal for rally, but chase just disconnects
75 chase_error = error
77 # name -> value map of processed greylag config parameters
78 GLP = {}
81 # FIX: This parsing is way too complex. How to simplify?
83 def mass_regime_part(part_specification):
84 """Parse a single mass regime specification part.
86 >>> mass_regime_part('MONO')
87 ('MONO', [])
88 >>> mass_regime_part('AVG')
89 ('AVG', [])
90 >>> mass_regime_part('MONO(N15@87.5%)')
91 ('MONO', [('N15', 0.875)])
93 """
94 ps = [ x.strip() for x in part_specification.partition('(') ]
95 if ps[0] not in ('MONO', 'AVG'):
96 raise ValueError("invalid mass regime list specification"
97 " (regime id must be 'MONO' or 'AVG')")
98 if not ps[1]:
99 return (ps[0], [])
100 if ps[2][-1] != ')':
101 raise ValueError("invalid mass regime list specification"
102 " (expected ')')")
103 pps = [ x.strip() for x in ps[2][:-1].split(',') ]
104 if len(pps) > 1:
105 raise ValueError("invalid mass regime list specification"
106 " (multiple isotopes not yet implemented)")
107 ppps = [ x.strip() for x in pps[0].partition('@') ]
108 if not ppps[1]:
109 raise ValueError("invalid mass regime list specification"
110 " (expected '@')")
111 if ppps[0] not in ('N15',):
112 raise ValueError("invalid mass regime list specification"
113 " (isotope id must currently be 'N15')")
114 if ppps[2][-1] != '%':
115 raise ValueError("invalid mass regime list specification"
116 " (expected '%')")
117 prevalence = float(ppps[2][:-1]) / 100
118 if not (0 <= prevalence <= 1):
119 raise ValueError("invalid mass regime list specification"
120 " (prevalence must be in range 0-100%)")
121 return (ps[0], [(ppps[0], prevalence)])
124 def mass_regime_list(mass_regime_list_specification):
125 """Check and return a list of regime tuples (parent_regime,
126 fragment_regime), where each regime is a tuple (id, [(isotope_id,
127 prevalence), ...]). Multiple isotopes (when implemented) would be
128 comma-separated.
130 >>> pprint(mass_regime_list('AVG/MONO;MONO;MONO(N15@75%)'))
131 [[('AVG', []), ('MONO', [])],
132 [('MONO', []), ('MONO', [])],
133 [('MONO', [('N15', 0.75)]), ('MONO', [('N15', 0.75)])]]
136 result = []
137 for regspec in mass_regime_list_specification.split(';'):
138 halves = [ x.strip() for x in regspec.split('/') ]
139 if len(halves) > 2:
140 raise ValueError("invalid mass regime list specification"
141 " (too many '/'?)")
142 pr = [ mass_regime_part(h) for h in halves ]
143 if len(pr) == 1:
144 pr = [ pr[0], pr[0] ]
145 result.append(pr)
147 # The first fragmentation regime should generally be MONO, so that
148 # formulaic deltas with '!' do the expected thing.
149 if result[0][1] != ('MONO', []):
150 raise ValueError("first fragmentation regime was something other than"
151 " 'MONO' with no isotopes--this is almost certainly"
152 " not what was intended")
153 return result
156 def parse_mod_term(s, is_potential=False):
157 """Parse a modification term, returning a tuple (sign, mod, fixed_regime,
158 residues, description, marker). The marker character must be printable
159 and non-alphabetic.
161 >>> parse_mod_term('-C2H3ON!@C')
162 (-1, 'C2H3ON', True, 'C', None, None)
163 >>> parse_mod_term("42@STY phosphorylation '*'", is_potential=True)
164 (1, 42.0, False, 'STY', 'phosphorylation', '*')
168 m = re.match(r'^\s*(-|\+)?(([1-9][0-9.]*)|([A-Z][A-Z0-9]*))(!?)'
169 r'@([A-Z]+|\[|\])(\s+([A-Za-z0-9_]+))?'
170 r"(\s+'([[-`!-@{-~])')?\s*$", s)
171 if not m:
172 raise ValueError("invalid modification term specification"
173 " '%s'" % s)
174 mg = m.groups()
175 invalid_residues = set(mg[5]) - set(RESIDUES_W_BRACKETS)
176 if invalid_residues:
177 raise ValueError("invalid modification list specification"
178 " (invalid residues %s)" % list(invalid_residues))
179 delta = mg[1]
180 if mg[2]:
181 delta = float(mg[1])
182 if is_potential and abs(delta) < 0.0001:
183 raise ValueError("invalid modification list specification"
184 " (delta '%s' is too small)" % delta)
185 residues = mg[5]
186 if not is_potential and len(residues) != 1:
187 raise ValueError("invalid modification list specification '%s' (only"
188 " potential modifications may have multiple residues)"
189 % residues)
190 if len(residues) != len(set(residues)):
191 raise ValueError("invalid modification list specification"
192 " '%s' (duplicate residues prohibited)"
193 % residues)
194 return (1 if mg[0] != '-' else -1, delta, mg[4] == '!', residues, mg[7],
195 mg[9])
198 def fixed_mod_list(specification):
199 """Check and return a list of modification tuples.
201 >>> fixed_mod_list('57@C')
202 [(1, 57.0, False, 'C', None, None)]
203 >>> fixed_mod_list("57@C '*',CH!@N desc")
204 [(1, 57.0, False, 'C', None, '*'), (1, 'CH', True, 'N', 'desc', None)]
205 >>> fixed_mod_list('57@C,58@C')
206 Traceback (most recent call last):
208 ValueError: invalid modification list specification '['C', 'C']' (duplicate residues prohibited)
211 if not specification:
212 return []
213 result = [ parse_mod_term(s) for s in specification.split(',') ]
214 residues = [ x[3] for x in result ]
215 # this check is across terms; the one above is within terms
216 if len(residues) != len(set(residues)):
217 raise ValueError("invalid modification list specification"
218 " '%s' (duplicate residues prohibited)"
219 % residues)
220 debug("fixed_mod_list:\n%s", pformat(result))
221 return result
224 def parse_mod_basic_expression(s):
225 """Parse s looking for either a term or a parenthesized subexpression.
226 Return the pair (tree of modification tuples, unparsed suffix of s).
228 s = s.strip()
229 if s[0] == '(':
230 tree, rest = parse_mod_disjunction(s[1:])
231 rest = rest.lstrip()
232 if rest[:1] != ')':
233 raise ValueError("invalid modification list specification"
234 " (expected matching ')')")
235 return tree, rest[1:]
236 parts = re.split(r'([;,()])', s, 1)
237 if len(parts) == 1:
238 term, rest = s, ''
239 else:
240 assert len(parts) == 3
241 term, rest = parts[0], parts[1]+parts[2]
242 return parse_mod_term(term, is_potential=True), rest
244 def parse_mod_conjunction(s):
245 """Parse s looking for a ','-separated sequence of subexpressions. Return
246 the pair (tree of modification tuples, unparsed suffix of s).
248 result = []
249 while s:
250 tree, s = parse_mod_basic_expression(s)
251 result.append(tree)
252 s = s.lstrip()
253 if s[:1] != ',':
254 break
255 s = s[1:]
256 return result, s
258 def parse_mod_disjunction(s):
259 """Parse s looking for a ';'-separated sequence of subexpressions. Return
260 the pair (tree of modification tuples, unparsed suffix of s).
263 result = []
264 while s:
265 tree, s = parse_mod_conjunction(s)
266 result.append(tree)
267 s = s.lstrip()
268 if s[:1] != ';':
269 break
270 s = s[1:]
271 return result, s
273 def potential_mod_list(specification):
274 """Check and return a tree of potential modification tuples. Nodes at
275 even (odd) levels are disjunctions (conjunctions). (The top list is a
276 disjunction.)
278 >>> potential_mod_list('')
280 >>> pprint(potential_mod_list('PO3H@STY; C2H2O@KST'))
281 [[(1, 'PO3H', False, 'STY', None, None)],
282 [(1, 'C2H2O', False, 'KST', None, None)]]
283 >>> pprint(potential_mod_list('PO3H@STY, C2H2O@KST'))
284 [[(1, 'PO3H', False, 'STY', None, None),
285 (1, 'C2H2O', False, 'KST', None, None)]]
286 >>> pprint(potential_mod_list('''(PO3H@STY phosphorylation '*';
287 ... C2H2O@KST acetylation '^';
288 ... CH2@AKST methylation '#'),
289 ... O@M oxidation '@'
290 ... '''))
291 [[[[(1, 'PO3H', False, 'STY', 'phosphorylation', '*')],
292 [(1, 'C2H2O', False, 'KST', 'acetylation', '^')],
293 [(1, 'CH2', False, 'AKST', 'methylation', '#')]],
294 (1, 'O', False, 'M', 'oxidation', '@')]]
297 if not specification:
298 return []
299 tree, remainder = parse_mod_disjunction(specification)
300 if remainder:
301 raise ValueError("invalid modification list specification"
302 " (unexpected '%s')" % remainder)
303 debug("potential_mod_list:\n%s", pformat(tree))
304 return tree
307 def p_positive(x): return x > 0
308 def p_negative(x): return x < 0
309 def p_nonnegative(x): return x >= 0
310 def p_proportion(x): return 0 <= x <= 1
312 # Configuration file parameter specification
314 # name -> (type, default, check_fn)
315 # type may be bool, int, float, str, modlist, or a tuple of values
316 # default, as a string value, or None if value must be explicitly specified
317 # check_fn is an optional function that returns True iff the value is valid
319 PARAMETER_INFO = {
320 "databases" : (str, None),
321 "mass_regimes" : (mass_regime_list, "MONO"),
322 "mass_regime_debug_delta" : (float, 0),
323 "pervasive_mods" : (fixed_mod_list, ""),
324 "potential_mods" : (potential_mod_list, ""),
325 "potential_mod_limit" : (int, 2, p_nonnegative),
326 "enable_pca_mods" : (bool, "yes"),
327 "charge_limit" : (int, 3, p_positive),
328 "min_peptide_length" : (int, 5, p_positive),
329 "cleavage_motif" : (str, "[X]|[X]"),
330 "maximum_missed_cleavage_sites" : (int, 1, p_nonnegative),
331 "min_parent_spectrum_mass" : (float, 0, p_nonnegative),
332 "max_parent_spectrum_mass" : (float, 10000, p_nonnegative),
333 "TIC_cutoff_proportion" : (float, 0.98, p_proportion),
334 "parent_mz_tolerance" : (float, 1.25, p_nonnegative),
335 "fragment_mass_tolerance" : (float, 0.5, p_nonnegative),
336 "intensity_class_count" : (int, 3, p_positive),
337 "intensity_class_ratio" : (float, 2.0, p_positive), # really > 1.0?
338 "best_result_count" : (int, 5, p_positive),
342 def validate_parameters(parameters, parameter_info=PARAMETER_INFO):
343 """Verify that parameters are valid, have valid values, and correspond to
344 currently implemented functionality. Values are converted, default values
345 are filled in, and the resulting name/value dict returned.
347 >>> sorted(validate_parameters({'spectrum, total peaks' : '40'},
348 ... {'spectrum, total peaks'
349 ... : (int, '50', p_positive),
350 ... 'output, spectra'
351 ... : (bool, 'no')}).items())
352 [('output, spectra', False), ('spectrum, total peaks', 40)]
356 pmap = {}
357 for p_name, p_info in sorted(parameter_info.items()):
358 type_ = p_info[0]
359 default = p_info[1]
360 check_fn = p_info[2] if len(p_info) > 2 else None
362 v = parameters.get(p_name)
363 if v == None:
364 if default != None:
365 debug("parameter '%s' defaulting to '%s'", p_name, default)
366 v = default
367 else:
368 error("missing required parameter '%s'" % p_name)
369 if isinstance(type_, tuple):
370 if not v in type_:
371 error("parameter '%s' value '%s' not in %s (feature not"
372 " implemented?)" % (p_name, v, type_))
373 elif type_ == bool:
374 v = { 'yes' : True, 'no' : False }.get(v)
375 if v == None:
376 error("parameter '%s' requires a value of 'yes' or 'no'")
377 else:
378 try:
379 v = type_(v)
380 except ValueError, e:
381 error("parameter '%s' has value '%s' with invalid format [%s]"
382 % (p_name, v, e))
383 if check_fn and not check_fn(v):
384 error("parameter '%s' has invalid value '%s' (or feature not"
385 " implemented)" % (p_name, v))
386 pmap[p_name] = v
388 unknown_parameters = set(parameters) - set(parameter_info)
389 if unknown_parameters:
390 warning("%s unknown parameter(s):\n %s", len(unknown_parameters),
391 pformat(sorted(list(unknown_parameters))))
392 return pmap
395 def file_contents(filename):
396 """Return the contents of the file."""
397 with open(filename) as f:
398 return f.read()
401 # about 100/s, probably fast enough (just transmitting takes longer?)
402 def generate_spectra_from_ms2_files(spectrum_fns):
403 """Read ms2 files, yielding tuples (file_index, physical_index, index,
404 name, mass, charge, peaks). The indexes are 0-based. Mass and charge are
405 returned as strings, and peaks is the newline-embedded peak block from the
406 file.
408 header_re = re.compile(r'^:', re.MULTILINE)
410 # check open-ability of all files immediately, to avoid surprises later
411 for fn in spectrum_fns:
412 open(fn).close()
414 physical_index = 0
415 index = 0
416 pending_headers = []
417 for file_index, spfn in enumerate(spectrum_fns):
418 contents = file_contents(spfn)
419 for block in header_re.split(contents)[1:]:
420 b = block.split('\n', 2)
421 if len(b) != 3:
422 error("malformed ms2 file '%s' (malformed header)" % spfn)
423 name, h_line1, peaks = b
424 h = h_line1.split(' ', 1)
425 if len(h) != 2:
426 error("malformed ms2 file '%s' (malformed mass/charge header)"
427 % spfn)
428 mass, charge = h
429 pending_headers.append((name, mass, charge))
430 if peaks:
431 for name, mass, charge in pending_headers:
432 yield (file_index, physical_index, index,
433 name, mass, charge, peaks)
434 index += 1
435 pending_headers = []
436 physical_index += 1
437 if pending_headers:
438 error("malformed ms2 file '%s' (headers at end?)" % spfn)
441 def count_ms2_spectra(spectrum_fns):
442 """Return count of ms2 spectra, for ETA calculation, etc."""
443 return sum(file_contents(fn).count(':') for fn in spectrum_fns)
446 # This uses too much RAM...
447 # def check_ms2_files(spectrum_fns):
448 # """Verify that the spectrum files are well-formed, calling error if not."""
450 # ms2_re = re.compile(r'''
451 # ^(
452 # (:.*\n
453 # [1-9]\d*\.\d+\ [1-9]\d*\r?\n)+
454 # (\d+\.\d+\ \d+(\.\d+)?\r?\n)+
455 # )+$
456 # ''', re.VERBOSE)
458 # for file_index, spfn in enumerate(spectrum_fns):
459 # with open(spfn) as specfile:
460 # contents = specfile.read()
461 # info("SKIP checking '%s' (%s bytes)", spfn, len(contents))
462 # #if not ms2_re.match(contents):
463 # # error("invalid ms2 file '%s'", spfn)
466 # FIX: optionally print peak statistics? (slow?)
467 # def peak_statistics(spectra):
468 # counts = [ len(sp.peaks) for sp in spectra ]
469 # counts.sort()
470 # n = len(counts)
471 # return (counts[0], counts[int(n*0.25)], counts[int(n*0.5)],
472 # counts[int(n*0.75)], counts[-1], sum(counts) / float(n))
474 # def print_spectrum_statistics(spectra):
475 # info(" %s spectra (mass range %s - %s)", len(spectra),
476 # spectra[0].mass, spectra[-1].mass)
477 # info(" peak stats: %s..%s..%s..%s..%s (mean=%.2f)"
478 # % peak_statistics(spectra))
481 class chase_client(asynchat.async_chat):
483 # heapq of (deathtime, host, port) for connections that have died or
484 # timed-out. deathtime is the time of death. It's up to external code to
485 # recreate after a suitable wait, if desired.
486 dead_clients = []
488 # cached so that we don't have thousands of identical copies
489 pickled_parameters = None
490 pickled_sequences = None
492 spectrum_generator = None
494 # (host, port) -> (submittime, [ spectrum, ... ])
495 # tracks the spectra each chase_client is currently searching
496 spectra_in_progress = {}
498 # heapq of (lastsubmittime, spectrum) for spectra submitted (at least
499 # once) and (possibly) not yet completed. lastsubmittime is provided so
500 # that we can submit the least recently submitted spectrum next.
501 # Not populated until spectrum_generator has been exhausted (to avoid
502 # keeping lots of spectra in memory).
503 spectrum_queue = None
505 # set of spectrum_no's of spectra already searched (only those searched
506 # since we populated spectrum_queue)
507 searched_spectra = set()
509 # number of spectra for which results not yet received
510 spectra_to_go = None
512 # The number of spectra submitted to clients for a search command. This
513 # can be adjusted upwards for efficiency.
514 spectrum_batch_size = 1
516 # file on which to write pickled results
517 result_file = None
518 # index of result_file
519 # [ (spectrum_id, offset, length), ... ]
520 result_file_index = []
522 @classmethod
523 def set_parameters(self, parameters):
524 assert self.pickled_parameters == None, "no change once set"
525 self.pickled_parameters = pickle.dumps(parameters)
527 @classmethod
528 def set_sequences(self, sequences):
529 assert self.pickled_sequences == None, "no change once set"
530 self.pickled_sequences = pickle.dumps(sequences)
532 @classmethod
533 def set_result_file(self, result_file):
534 assert self.result_file == None, "no change once set"
535 self.result_file = result_file
537 @classmethod
538 def set_spectra(self, spectrum_generator, spectra_to_go):
539 """Fix the list of spectra to be searched, which must be done before
540 any clients are created (and cannot be changed later).
542 assert self.spectrum_generator == None, "no change once set"
543 self.spectrum_generator = spectrum_generator
544 self.spectra_to_go = spectra_to_go
546 @classmethod
547 def _remember_result(self, spectrum_id, result):
548 if result:
549 offset = self.result_file.tell()
550 del result[0]['id'] # ids differ for future subset files
551 pickle.dump(result, self.result_file, pickle.HIGHEST_PROTOCOL)
552 length = self.result_file.tell() - offset
553 self.result_file_index.append((result[0]['name'], offset, length))
554 assert spectrum_id not in self.searched_spectra
555 self.searched_spectra.add(spectrum_id)
556 self.spectra_to_go -= 1
559 def __init__(self, host, port):
560 asynchat.async_chat.__init__(self)
561 assert self.pickled_parameters and self.pickled_sequences
562 assert self.spectrum_generator
563 self.host = host
564 self.port = port
566 self.ibuffer = []
567 self.set_terminator('\n')
569 self.banner = None
571 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
572 self.set_keep_alive()
573 self.connect((host, port))
575 # override asynchat version to use a reasonable buffer size
576 def push(self, data):
577 self.producer_fifo.push(
578 asynchat.simple_producer(data, buffer_size=self.ac_out_buffer_size))
579 self.initiate_send()
581 def __repr__(self):
582 return "<%s connected to %s:%s>" % (self.__class__.__name__,
583 self.host, self.port)
586 def set_keep_alive(self):
587 # try to better notice reboots, net failures, etc
588 try:
589 self.socket.setsockopt(
590 socket.SOL_SOCKET, socket.SO_KEEPALIVE,
591 self.socket.getsockopt(socket.SOL_SOCKET,
592 socket.SO_KEEPALIVE) | 1
594 except socket.error:
595 pass
598 def _send_command(self, command, pickled_argument):
599 self.push(command + ' ')
600 self.push("%s\n" % len(pickled_argument))
601 self.push(pickled_argument)
603 def _submit_search(self):
604 "send a search to client"
606 batch_size = self.spectrum_batch_size
608 # start slow, to get chasers going immediately, then increase batch
609 # size to increase master efficiency (and level load)
610 #batch_size = min(self.spectrum_batch_size,
611 # 4**min(self.submit_count-1,10))
613 #if self.spectrum_queue:
614 # # on each retry round, divide batch size by four
615 # batch_size = max(1, (batch_size
616 # / 2**min(64, 2*self.spectrum_queue[0][0])))
618 # get some never-searched spectra, if there are any
619 submit_spectra = list(itertools.islice(self.spectrum_generator,
620 batch_size))
621 if submit_spectra:
622 self.spectra_in_progress[(self.host, self.port)] = (time.time(),
623 submit_spectra)
624 else:
625 if self.spectrum_queue == None:
626 # move spectra_in_progress to spectrum_queue
627 self.__class__.spectrum_queue = []
628 for t, subsp in self.spectra_in_progress.values():
629 for sp in subsp:
630 heapq.heappush(self.spectrum_queue, (t, sp))
631 self.__class__.spectra_in_progress = None
633 # generate submit_spectra from spectrum_queue
634 while self.spectrum_queue and len(submit_spectra) < batch_size:
635 _submittime, spectrum = heapq.heappop(self.spectrum_queue)
636 # spectrum = (file_index, physical_index, index, name, mass,
637 # charge, peaks)
638 if spectrum[2] in self.searched_spectra:
639 continue # already have search result
640 heapq.heappush(self.spectrum_queue, (time.time(), spectrum))
641 submit_spectra.append(spectrum)
643 if submit_spectra:
644 debug("submitting %s spectra (%s not yet retired)",
645 len(submit_spectra), self.spectra_to_go)
646 self._send_command('spectra', pickle.dumps(submit_spectra))
647 self._send_command('search', pickle.dumps(None))
648 else:
649 self.close()
651 def _receive_response(self):
652 r = ''.join(self.ibuffer)
653 self.ibuffer = []
654 return r
657 def handle_connect(self):
658 debug("connecting to %s:%s", self.host, self.port)
660 def handle_expt(self):
661 pass
663 def handle_error(self):
664 #self.handle_close()
665 exc, why, _traceback = sys.exc_info()
666 self.connected = False
667 if exc == exceptions.KeyboardInterrupt: # FIX: works?
668 error("received keyboard interrupt")
669 if exc == socket.error:
670 if why[0] == errno.ECONNREFUSED:
671 debug("no chaser at %s:%s (connection refused)", self.host,
672 self.port)
673 else:
674 info("network error on connection %s:%s (%s %s)", self.host,
675 self.port, exc, why)
676 debug(" traceback: %s", asyncore.compact_traceback()[3])
677 else:
678 info("unexpected error on connection %s:%s (%s %s)", self.host,
679 self.port, exc, why)
680 info(" traceback: %s", asyncore.compact_traceback()[3])
683 def handle_close(self):
684 # FIX
685 #self.__class__.dead_clients.append((time.time(), self.host, self.port))
686 self.dead_clients.append((time.time(), self.host, self.port))
687 asynchat.async_chat.handle_close(self)
690 def collect_incoming_data(self, data):
691 self.ibuffer.append(data)
693 def found_terminator(self):
694 if self.banner == None:
695 self.banner = self._receive_response()
696 banner_words = self.banner.split()
697 if banner_words[0] != 'greylag':
698 warning("%s:%s is not a greylag-chase", self.host, self.port)
699 self.handle_close()
700 return
701 if banner_words[2:3] != ['ready']:
702 debug("greylag-chase %s:%s is serving %s", self.host,
703 self.port, banner_words[3:4])
704 self.handle_close()
705 return
706 # now we can start talking
707 self._send_command('parameters', self.pickled_parameters)
708 self._send_command('sequences', self.pickled_sequences)
709 self._submit_search()
710 return
712 if self.get_terminator() == '\n':
713 reply = self._receive_response()
714 reply_words = reply.split()
715 if reply_words[0] == 'error':
716 warning("%s:%s gave '%s'", self.host, self.port, reply)
717 self.handle_close()
718 return
720 assert reply_words[0] == 'found' and len(reply_words) == 2
721 self.set_terminator(int(reply_words[1]))
722 return
724 result = pickle.loads(self._receive_response())
726 #debug("received: %s" % result)
727 # FIX: check that multiply searched spectra had same results
728 for noise_sp_id in result['noise_spectrum_ids']:
729 if noise_sp_id not in self.searched_spectra:
730 self._remember_result(noise_sp_id, False)
732 for sp_id, sp_matches in result['matches'].iteritems():
733 if sp_id not in self.searched_spectra:
734 self._remember_result(sp_id, sp_matches)
736 self.set_terminator('\n')
737 self._submit_search()
740 def main(args=sys.argv[1:]):
741 parser = optparse.OptionParser(usage=
742 "usage: %prog [options]"
743 " <configuration-file> <ms2-file>...",
744 description=__doc__, version=VERSION)
745 pa = parser.add_option
746 DEFAULT_HOSTFILE = "/etc/greylag/hosts"
747 pa("--hostfile", dest="hostfile", default=DEFAULT_HOSTFILE,
748 help="file listing host:port locations where greylag-chase workers are"
749 " listening. [default '%s']" % DEFAULT_HOSTFILE)
750 pa("--job-id", dest="job_id", default="unknown",
751 help="used to generate unique output filenames [default 'unknown']")
752 #pa("--estimate", action="store_true", dest="estimate_only",
753 # help="just estimate the time required for the search")
754 pa("-v", "--verbose", action="store_true", dest="verbose",
755 help="be verbose")
756 pa("-q", "--quiet", action="store_true", dest="quiet", help="no warnings")
757 pa("-l", "--logfile", dest="logfile",
758 help="log to FILE instead of stderr", metavar="FILE")
759 pa("-P", "--parameter", dest="parameters", action="append",
760 default=[],
761 help="override a parameter in <configuration-file>, may be used"
762 " multiple times", metavar="NAME=VALUE")
763 pa("--copyright", action="store_true", dest="copyright",
764 help="print copyright and exit")
765 pa("--debug", action="store_true", dest="debug",
766 help="output debugging info")
767 pa("--profile", action="store_true", dest="profile",
768 help="dump Python profiling output to './greylag-rally.prof.<pid>'")
769 (options, args) = parser.parse_args(args=args)
771 if options.copyright:
772 print __copyright__
773 sys.exit(0)
775 if len(args) < 2:
776 parser.print_help()
777 sys.exit(1)
779 configuration_fn = args[0]
780 spectrum_fns = args[1:]
782 if (any(True for f in spectrum_fns if not f.endswith('.ms2'))
783 or any(True for p in options.parameters if '=' not in p)):
784 parser.print_help()
785 sys.exit(1)
787 set_logging(options)
789 info("starting on %s", socket.gethostname())
791 # check early that we can open result files
792 result_fn = 'chase_%s.glw' % options.job_id
793 result_file = open(result_fn, 'wb')
794 temp_result_file = tempfile.TemporaryFile(dir='.', prefix=result_fn+'-')
796 # read chaser (host, port) listener list
797 try:
798 with open(options.hostfile) as hostf:
799 hosts = [ l.split(':', 1) for l in hostf ]
800 hosts = [ (host, int(port)) for host, port in hosts ]
801 except ValueError:
802 error("invalid or empty host line in '%s'" % options.hostfile)
804 if not hosts:
805 error("no valid search hosts specified")
807 # check spectrum basename uniqueness, as corresponding sqt files will be
808 # in a single directory
809 base_spectrum_fns = [ os.path.basename(fn) for fn in spectrum_fns ]
810 if len(base_spectrum_fns) != len(set(base_spectrum_fns)):
811 error("base spectrum filenames must be unique")
813 options.parameters = [ p.split('=', 1) for p in options.parameters ]
815 # check -P names for validity
816 bad_names = (set(n for n,v in options.parameters)
817 - set(PARAMETER_INFO.keys()))
818 if bad_names:
819 error("bad -P parameter names %s" % list(bad_names))
821 # read params
822 cp = ConfigParser.RawConfigParser()
823 cp.optionxform = str # be case-sensitive
824 with open(configuration_fn) as configuration_file:
825 cp.readfp(configuration_file)
826 if not cp.has_section('greylag'):
827 error("%s has no [greylag] section" % configuration_fn)
828 parameters = dict(cp.items('greylag'))
829 parameters.update(dict(options.parameters)) # command-line override
830 global GLP
831 GLP = validate_parameters(parameters)
833 # test parse cleavage motif here, so we notice errors early
834 cleavage_motif_re(GLP["cleavage_motif"])
836 fixed_mod_map = dict((r[3], r) for r in GLP["pervasive_mods"])
837 # FIX: is this even necessary?
838 regime_manifest = initialize_spectrum_parameters(options, GLP,
839 GLP["mass_regimes"],
840 fixed_mod_map)
841 info("reading sequence databases")
842 # read sequence dbs
843 databases = GLP["databases"].split()
844 # [(locusname, defline, seq, filename), ...]
845 fasta_db = list(read_fasta_files(databases))
847 # This is what we'll send to chasers. Either (1) the already parsed
848 # contents of the databases, which is slow to send, or (2) a list of
849 # (database name, sha1 checksums), which requires that the files exist on
850 # the chaser hosts.
851 db_representation = ("value", fasta_db)
852 if 1: # FIX: make this an option?
853 db_representation = ("name",
854 [ (db, file_sha1(db)) for db in databases ])
856 # FIX: is this worth doing here?
857 #info("checking spectrum files")
858 #check_ms2_files(spectrum_fns)
859 #info("spectrum files okay")
861 spectrum_count = count_ms2_spectra(spectrum_fns)
863 if not spectrum_count:
864 error("no input spectra")
865 info("counted %s spectra", spectrum_count)
867 # FIX: gather these statistics from chasers? (!)
868 # before/after filtering...
869 # print_spectrum_statistics(spectra)
871 # now connect to chasers and loop until done (or timeout)
872 chase_client.set_parameters(GLP)
873 chase_client.set_sequences(db_representation)
874 chase_client.set_spectra(generate_spectra_from_ms2_files(spectrum_fns),
875 spectrum_count)
876 chase_client.set_result_file(temp_result_file)
878 # Trivial strategy: Hand each client 1/10th of their likely workload in
879 # each batch. If client crashes, we haven't lost too much work.
880 # Making this value larger keeps polling efficient and lets us handle a
881 # large number of clients. If it's too large, though, we're need more RAM
882 # for temporary storage of pythonized spectra.
883 chase_client.spectrum_batch_size = min(100, max(1, int(0.1 * spectrum_count
884 / len(hosts))))
886 # FIX: this is actually faster (?)
887 #chase_client.spectrum_batch_size = 1
889 # adjust for performance
890 chase_client.ac_in_buffer_size = 62000
891 chase_client.ac_out_buffer_size = 62000
892 #chase_client.ac_in_buffer_size = 620000
893 #chase_client.ac_out_buffer_size = 620000
895 for host, port in hosts:
896 chase_client(host, port)
898 # retry time for dead clients (in seconds)
899 retryafter = 60
901 start_time = time.time()
902 laststatus = 0
903 while True:
904 asyncore.loop(count=1, timeout=retryafter, use_poll=True)
905 if chase_client.spectrum_queue == []:
906 break
908 now = time.time()
910 if now - laststatus >= 10:
911 eta_minutes = ((now - start_time)
912 / (spectrum_count - chase_client.spectra_to_go
913 + 0.1)
914 * chase_client.spectra_to_go / 60.0)
915 info("%s spectra to search, %s chasers, ETA %dm",
916 chase_client.spectra_to_go, len(asyncore.socket_map),
917 int(eta_minutes))
918 laststatus = now
920 while (chase_client.dead_clients
921 and chase_client.dead_clients[0][0] < now - retryafter):
922 debug("died %s restart %s", chase_client.dead_clients[0][0], now)
923 dt, host, port = heapq.heappop(chase_client.dead_clients)
924 chase_client(host, port)
925 if not asyncore.socket_map and chase_client.dead_clients:
926 debug("sleeping")
927 time.sleep(max(0, retryafter - (now
928 - chase_client.dead_clients[0][0])))
929 # failsafe, better performance?
930 #time.sleep(0.1)
933 # FIX: could close all client sockets here--worth doing?
935 # FIX
936 #total_comparisons = sum(ss.candidate_spectrum_count
937 # for ss in score_statistics)
938 #if options.estimate_only:
939 # # divisor is just a slightly informed guess
940 # print ("%.2f generic CPU hours" % (total_comparisons / 439.0e6))
941 # return
943 info("writing result file '%s'", result_fn)
945 pk = pickle.Pickler(result_file, pickle.HIGHEST_PROTOCOL)
946 pk.fast = 1 # stipulate no circular references
947 mod_conjunct_triples = get_mod_conjunct_triples(GLP["potential_mods"],
948 GLP["potential_mod_limit"],
949 GLP["mass_regimes"])
951 pk.dump({ 'version' : VERSION,
952 'total comparisons' : 0,
953 'spectrum files' : base_spectrum_fns,
954 'databases' : databases,
955 'parameters' : GLP,
956 'mass regime atomic masses' : dump_mass_regime_atomic_masses(),
957 'mass regime manifest' : sorted(regime_manifest),
958 'proton mass' : PROTON_MASS,
959 'modification conjuncts' : mod_conjunct_triples,
960 'argv' : sys.argv })
962 temp_result_file.flush() # unneeded?
963 for sp_name, offset, length in sorted(chase_client.result_file_index):
964 temp_result_file.seek(offset)
965 r = temp_result_file.read(length)
966 assert len(r) == length
967 result_file.write(r)
969 pk.dump('complete') # so reader can tell file not truncated
970 result_file.close() # force any file error to occur now
971 info("finished")
974 if __name__ == '__main__':
975 # try:
976 # import psyco
977 # psyco.full()
978 # ###psyco.bind(formula_mass)
979 # warning('psyco enabled')
980 # except ImportError:
981 # pass
983 try:
984 if '--profile' in sys.argv:
985 import cProfile
986 import pstats
987 report_fn = "greylag-rally.prof.%s" % os.getpid()
988 data_fn = report_fn + ".tmp"
989 prof = cProfile.run('main()', data_fn)
990 with open(report_fn, 'w') as report_f:
991 try:
992 stats = pstats.Stats(data_fn, stream=report_f)
993 stats.strip_dirs()
994 stats.sort_stats('cumulative')
995 stats.print_stats(50)
996 stats.sort_stats('time')
997 stats.print_stats(50)
998 print "# profile report written to '%s'" % report_fn
999 finally:
1000 try:
1001 os.remove(data_fn)
1002 except:
1003 pass
1004 else:
1005 main()
1006 except SystemExit:
1007 raise
1008 except:
1009 logging.exception("unhandled exception")
1010 sys.exit(1)
1011 finally:
1012 logging.shutdown()