3 '''This program coordinates the search of mass spectra by sending subsearches
4 to running instances of greylag-chase, which are expected to be listening on
5 the specified hosts/ports.
7 <job-id> is a unique identifier used as a prefix for work files. The
8 <configuration-file> contains program options. The spectra are in the
13 from __future__
import with_statement
16 greylag, a collection of programs for MS/MS protein analysis
17 Copyright (C) 2006-2008 Stowers Institute for Medical Research
19 This program is free software: you can redistribute it and/or modify
20 it under the terms of the GNU General Public License as published by
21 the Free Software Foundation, either version 3 of the License, or
22 (at your option) any later version.
24 This program is distributed in the hope that it will be useful,
25 but WITHOUT ANY WARRANTY; without even the implied warranty of
26 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 GNU General Public License for more details.
29 You should have received a copy of the GNU General Public License
30 along with this program. If not, see <http://www.gnu.org/licenses/>.
33 Stowers Institute for Medical Research
35 Kansas City, Missouri 64110
43 import cPickle
as pickle
48 import logging
; from logging
import debug
, info
, warning
51 from pprint
import pprint
, pformat
61 # gc possibly harms performance here, so disable it. gc only matters for
62 # cycles, which we (hope we) don't create. See the gc module docs.
63 import gc
; gc
.disable()
68 # if we're unit testing, just throw an exception
69 if __name__
!= "__main__":
70 raise Exception((s
+ " (fatal error)") % args
)
71 logging
.error(s
, *args
)
74 # fatal for rally, but chase just disconnects
77 # name -> value map of processed greylag config parameters
81 # FIX: This parsing is way too complex. How to simplify?
83 def mass_regime_part(part_specification
):
84 """Parse a single mass regime specification part.
86 >>> mass_regime_part('MONO')
88 >>> mass_regime_part('AVG')
90 >>> mass_regime_part('MONO(N15@87.5%)')
91 ('MONO', [('N15', 0.875)])
94 ps
= [ x
.strip() for x
in part_specification
.partition('(') ]
95 if ps
[0] not in ('MONO', 'AVG'):
96 raise ValueError("invalid mass regime list specification"
97 " (regime id must be 'MONO' or 'AVG')")
101 raise ValueError("invalid mass regime list specification"
103 pps
= [ x
.strip() for x
in ps
[2][:-1].split(',') ]
105 raise ValueError("invalid mass regime list specification"
106 " (multiple isotopes not yet implemented)")
107 ppps
= [ x
.strip() for x
in pps
[0].partition('@') ]
109 raise ValueError("invalid mass regime list specification"
111 if ppps
[0] not in ('N15',):
112 raise ValueError("invalid mass regime list specification"
113 " (isotope id must currently be 'N15')")
114 if ppps
[2][-1] != '%':
115 raise ValueError("invalid mass regime list specification"
117 prevalence
= float(ppps
[2][:-1]) / 100
118 if not (0 <= prevalence
<= 1):
119 raise ValueError("invalid mass regime list specification"
120 " (prevalence must be in range 0-100%)")
121 return (ps
[0], [(ppps
[0], prevalence
)])
124 def mass_regime_list(mass_regime_list_specification
):
125 """Check and return a list of regime tuples (parent_regime,
126 fragment_regime), where each regime is a tuple (id, [(isotope_id,
127 prevalence), ...]). Multiple isotopes (when implemented) would be
130 >>> pprint(mass_regime_list('AVG/MONO;MONO;MONO(N15@75%)'))
131 [[('AVG', []), ('MONO', [])],
132 [('MONO', []), ('MONO', [])],
133 [('MONO', [('N15', 0.75)]), ('MONO', [('N15', 0.75)])]]
137 for regspec
in mass_regime_list_specification
.split(';'):
138 halves
= [ x
.strip() for x
in regspec
.split('/') ]
140 raise ValueError("invalid mass regime list specification"
142 pr
= [ mass_regime_part(h
) for h
in halves
]
144 pr
= [ pr
[0], pr
[0] ]
147 # The first fragmentation regime should generally be MONO, so that
148 # formulaic deltas with '!' do the expected thing.
149 if result
[0][1] != ('MONO', []):
150 raise ValueError("first fragmentation regime was something other than"
151 " 'MONO' with no isotopes--this is almost certainly"
152 " not what was intended")
156 def parse_mod_term(s
, is_potential
=False):
157 """Parse a modification term, returning a tuple (sign, mod, fixed_regime,
158 residues, description, marker). The marker character must be printable
161 >>> parse_mod_term('-C2H3ON!@C')
162 (-1, 'C2H3ON', True, 'C', None, None)
163 >>> parse_mod_term("42@STY phosphorylation '*'", is_potential=True)
164 (1, 42.0, False, 'STY', 'phosphorylation', '*')
168 m
= re
.match(r
'^\s*(-|\+)?(([1-9][0-9.]*)|([A-Z][A-Z0-9]*))(!?)'
169 r
'@([A-Z]+|\[|\])(\s+([A-Za-z0-9_]+))?'
170 r
"(\s+'([[-`!-@{-~])')?\s*$", s
)
172 raise ValueError("invalid modification term specification"
175 invalid_residues
= set(mg
[5]) - set(RESIDUES_W_BRACKETS
)
177 raise ValueError("invalid modification list specification"
178 " (invalid residues %s)" % list(invalid_residues
))
182 if is_potential
and abs(delta
) < 0.0001:
183 raise ValueError("invalid modification list specification"
184 " (delta '%s' is too small)" % delta
)
186 if not is_potential
and len(residues
) != 1:
187 raise ValueError("invalid modification list specification '%s' (only"
188 " potential modifications may have multiple residues)"
190 if len(residues
) != len(set(residues
)):
191 raise ValueError("invalid modification list specification"
192 " '%s' (duplicate residues prohibited)"
194 return (1 if mg
[0] != '-' else -1, delta
, mg
[4] == '!', residues
, mg
[7],
198 def fixed_mod_list(specification
):
199 """Check and return a list of modification tuples.
201 >>> fixed_mod_list('57@C')
202 [(1, 57.0, False, 'C', None, None)]
203 >>> fixed_mod_list("57@C '*',CH!@N desc")
204 [(1, 57.0, False, 'C', None, '*'), (1, 'CH', True, 'N', 'desc', None)]
205 >>> fixed_mod_list('57@C,58@C')
206 Traceback (most recent call last):
208 ValueError: invalid modification list specification '['C', 'C']' (duplicate residues prohibited)
211 if not specification
:
213 result
= [ parse_mod_term(s
) for s
in specification
.split(',') ]
214 residues
= [ x
[3] for x
in result
]
215 # this check is across terms; the one above is within terms
216 if len(residues
) != len(set(residues
)):
217 raise ValueError("invalid modification list specification"
218 " '%s' (duplicate residues prohibited)"
220 debug("fixed_mod_list:\n%s", pformat(result
))
224 def parse_mod_basic_expression(s
):
225 """Parse s looking for either a term or a parenthesized subexpression.
226 Return the pair (tree of modification tuples, unparsed suffix of s).
230 tree
, rest
= parse_mod_disjunction(s
[1:])
233 raise ValueError("invalid modification list specification"
234 " (expected matching ')')")
235 return tree
, rest
[1:]
236 parts
= re
.split(r
'([;,()])', s
, 1)
240 assert len(parts
) == 3
241 term
, rest
= parts
[0], parts
[1]+parts
[2]
242 return parse_mod_term(term
, is_potential
=True), rest
244 def parse_mod_conjunction(s
):
245 """Parse s looking for a ','-separated sequence of subexpressions. Return
246 the pair (tree of modification tuples, unparsed suffix of s).
250 tree
, s
= parse_mod_basic_expression(s
)
258 def parse_mod_disjunction(s
):
259 """Parse s looking for a ';'-separated sequence of subexpressions. Return
260 the pair (tree of modification tuples, unparsed suffix of s).
265 tree
, s
= parse_mod_conjunction(s
)
273 def potential_mod_list(specification
):
274 """Check and return a tree of potential modification tuples. Nodes at
275 even (odd) levels are disjunctions (conjunctions). (The top list is a
278 >>> potential_mod_list('')
280 >>> pprint(potential_mod_list('PO3H@STY; C2H2O@KST'))
281 [[(1, 'PO3H', False, 'STY', None, None)],
282 [(1, 'C2H2O', False, 'KST', None, None)]]
283 >>> pprint(potential_mod_list('PO3H@STY, C2H2O@KST'))
284 [[(1, 'PO3H', False, 'STY', None, None),
285 (1, 'C2H2O', False, 'KST', None, None)]]
286 >>> pprint(potential_mod_list('''(PO3H@STY phosphorylation '*';
287 ... C2H2O@KST acetylation '^';
288 ... CH2@AKST methylation '#'),
289 ... O@M oxidation '@'
291 [[[[(1, 'PO3H', False, 'STY', 'phosphorylation', '*')],
292 [(1, 'C2H2O', False, 'KST', 'acetylation', '^')],
293 [(1, 'CH2', False, 'AKST', 'methylation', '#')]],
294 (1, 'O', False, 'M', 'oxidation', '@')]]
297 if not specification
:
299 tree
, remainder
= parse_mod_disjunction(specification
)
301 raise ValueError("invalid modification list specification"
302 " (unexpected '%s')" % remainder
)
303 debug("potential_mod_list:\n%s", pformat(tree
))
307 def p_positive(x
): return x
> 0
308 def p_negative(x
): return x
< 0
309 def p_nonnegative(x
): return x
>= 0
310 def p_proportion(x
): return 0 <= x
<= 1
312 # Configuration file parameter specification
314 # name -> (type, default, check_fn)
315 # type may be bool, int, float, str, modlist, or a tuple of values
316 # default, as a string value, or None if value must be explicitly specified
317 # check_fn is an optional function that returns True iff the value is valid
320 "databases" : (str, None),
321 "mass_regimes" : (mass_regime_list
, "MONO"),
322 "mass_regime_debug_delta" : (float, 0),
323 "pervasive_mods" : (fixed_mod_list
, ""),
324 "potential_mods" : (potential_mod_list
, ""),
325 "potential_mod_limit" : (int, 2, p_nonnegative
),
326 "enable_pca_mods" : (bool, "yes"),
327 "charge_limit" : (int, 3, p_positive
),
328 "min_peptide_length" : (int, 5, p_positive
),
329 "cleavage_motif" : (str, "[X]|[X]"),
330 "maximum_missed_cleavage_sites" : (int, 1, p_nonnegative
),
331 "min_parent_spectrum_mass" : (float, 0, p_nonnegative
),
332 "max_parent_spectrum_mass" : (float, 10000, p_nonnegative
),
333 "TIC_cutoff_proportion" : (float, 0.98, p_proportion
),
334 "parent_mz_tolerance" : (float, 1.25, p_nonnegative
),
335 "fragment_mass_tolerance" : (float, 0.5, p_nonnegative
),
336 "intensity_class_count" : (int, 3, p_positive
),
337 "intensity_class_ratio" : (float, 2.0, p_positive
), # really > 1.0?
338 "best_result_count" : (int, 5, p_positive
),
342 def validate_parameters(parameters
, parameter_info
=PARAMETER_INFO
):
343 """Verify that parameters are valid, have valid values, and correspond to
344 currently implemented functionality. Values are converted, default values
345 are filled in, and the resulting name/value dict returned.
347 >>> sorted(validate_parameters({'spectrum, total peaks' : '40'},
348 ... {'spectrum, total peaks'
349 ... : (int, '50', p_positive),
350 ... 'output, spectra'
351 ... : (bool, 'no')}).items())
352 [('output, spectra', False), ('spectrum, total peaks', 40)]
357 for p_name
, p_info
in sorted(parameter_info
.items()):
360 check_fn
= p_info
[2] if len(p_info
) > 2 else None
362 v
= parameters
.get(p_name
)
365 debug("parameter '%s' defaulting to '%s'", p_name
, default
)
368 error("missing required parameter '%s'" % p_name
)
369 if isinstance(type_
, tuple):
371 error("parameter '%s' value '%s' not in %s (feature not"
372 " implemented?)" % (p_name
, v
, type_
))
374 v
= { 'yes' : True, 'no' : False }.get(v
)
376 error("parameter '%s' requires a value of 'yes' or 'no'")
380 except ValueError, e
:
381 error("parameter '%s' has value '%s' with invalid format [%s]"
383 if check_fn
and not check_fn(v
):
384 error("parameter '%s' has invalid value '%s' (or feature not"
385 " implemented)" % (p_name
, v
))
388 unknown_parameters
= set(parameters
) - set(parameter_info
)
389 if unknown_parameters
:
390 warning("%s unknown parameter(s):\n %s", len(unknown_parameters
),
391 pformat(sorted(list(unknown_parameters
))))
395 def file_contents(filename
):
396 """Return the contents of the file."""
397 with
open(filename
) as f
:
401 # about 100/s, probably fast enough (just transmitting takes longer?)
402 def generate_spectra_from_ms2_files(spectrum_fns
):
403 """Read ms2 files, yielding tuples (file_index, physical_index, index,
404 name, mass, charge, peaks). The indexes are 0-based. Mass and charge are
405 returned as strings, and peaks is the newline-embedded peak block from the
408 header_re
= re
.compile(r
'^:', re
.MULTILINE
)
410 # check open-ability of all files immediately, to avoid surprises later
411 for fn
in spectrum_fns
:
417 for file_index
, spfn
in enumerate(spectrum_fns
):
418 contents
= file_contents(spfn
)
419 for block
in header_re
.split(contents
)[1:]:
420 b
= block
.split('\n', 2)
422 error("malformed ms2 file '%s' (malformed header)" % spfn
)
423 name
, h_line1
, peaks
= b
424 h
= h_line1
.split(' ', 1)
426 error("malformed ms2 file '%s' (malformed mass/charge header)"
429 pending_headers
.append((name
, mass
, charge
))
431 for name
, mass
, charge
in pending_headers
:
432 yield (file_index
, physical_index
, index
,
433 name
, mass
, charge
, peaks
)
438 error("malformed ms2 file '%s' (headers at end?)" % spfn
)
441 def count_ms2_spectra(spectrum_fns
):
442 """Return count of ms2 spectra, for ETA calculation, etc."""
443 return sum(file_contents(fn
).count(':') for fn
in spectrum_fns
)
446 # This uses too much RAM...
447 # def check_ms2_files(spectrum_fns):
448 # """Verify that the spectrum files are well-formed, calling error if not."""
450 # ms2_re = re.compile(r'''
453 # [1-9]\d*\.\d+\ [1-9]\d*\r?\n)+
454 # (\d+\.\d+\ \d+(\.\d+)?\r?\n)+
458 # for file_index, spfn in enumerate(spectrum_fns):
459 # with open(spfn) as specfile:
460 # contents = specfile.read()
461 # info("SKIP checking '%s' (%s bytes)", spfn, len(contents))
462 # #if not ms2_re.match(contents):
463 # # error("invalid ms2 file '%s'", spfn)
466 # FIX: optionally print peak statistics? (slow?)
467 # def peak_statistics(spectra):
468 # counts = [ len(sp.peaks) for sp in spectra ]
471 # return (counts[0], counts[int(n*0.25)], counts[int(n*0.5)],
472 # counts[int(n*0.75)], counts[-1], sum(counts) / float(n))
474 # def print_spectrum_statistics(spectra):
475 # info(" %s spectra (mass range %s - %s)", len(spectra),
476 # spectra[0].mass, spectra[-1].mass)
477 # info(" peak stats: %s..%s..%s..%s..%s (mean=%.2f)"
478 # % peak_statistics(spectra))
481 class chase_client(asynchat
.async_chat
):
483 # heapq of (deathtime, host, port) for connections that have died or
484 # timed-out. deathtime is the time of death. It's up to external code to
485 # recreate after a suitable wait, if desired.
488 # cached so that we don't have thousands of identical copies
489 pickled_parameters
= None
490 pickled_sequences
= None
492 spectrum_generator
= None
494 # (host, port) -> (submittime, [ spectrum, ... ])
495 # tracks the spectra each chase_client is currently searching
496 spectra_in_progress
= {}
498 # heapq of (lastsubmittime, spectrum) for spectra submitted (at least
499 # once) and (possibly) not yet completed. lastsubmittime is provided so
500 # that we can submit the least recently submitted spectrum next.
501 # Not populated until spectrum_generator has been exhausted (to avoid
502 # keeping lots of spectra in memory).
503 spectrum_queue
= None
505 # set of spectrum_no's of spectra already searched (only those searched
506 # since we populated spectrum_queue)
507 searched_spectra
= set()
509 # number of spectra for which results not yet received
512 # The number of spectra submitted to clients for a search command. This
513 # can be adjusted upwards for efficiency.
514 spectrum_batch_size
= 1
516 # file on which to write pickled results
518 # index of result_file
519 # [ (spectrum_id, offset, length), ... ]
520 result_file_index
= []
523 def set_parameters(self
, parameters
):
524 assert self
.pickled_parameters
== None, "no change once set"
525 self
.pickled_parameters
= pickle
.dumps(parameters
)
528 def set_sequences(self
, sequences
):
529 assert self
.pickled_sequences
== None, "no change once set"
530 self
.pickled_sequences
= pickle
.dumps(sequences
)
533 def set_result_file(self
, result_file
):
534 assert self
.result_file
== None, "no change once set"
535 self
.result_file
= result_file
538 def set_spectra(self
, spectrum_generator
, spectra_to_go
):
539 """Fix the list of spectra to be searched, which must be done before
540 any clients are created (and cannot be changed later).
542 assert self
.spectrum_generator
== None, "no change once set"
543 self
.spectrum_generator
= spectrum_generator
544 self
.spectra_to_go
= spectra_to_go
547 def _remember_result(self
, spectrum_id
, result
):
549 offset
= self
.result_file
.tell()
550 del result
[0]['id'] # ids differ for future subset files
551 pickle
.dump(result
, self
.result_file
, pickle
.HIGHEST_PROTOCOL
)
552 length
= self
.result_file
.tell() - offset
553 self
.result_file_index
.append((result
[0]['name'], offset
, length
))
554 assert spectrum_id
not in self
.searched_spectra
555 self
.searched_spectra
.add(spectrum_id
)
556 self
.spectra_to_go
-= 1
559 def __init__(self
, host
, port
):
560 asynchat
.async_chat
.__init
__(self
)
561 assert self
.pickled_parameters
and self
.pickled_sequences
562 assert self
.spectrum_generator
567 self
.set_terminator('\n')
571 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
572 self
.set_keep_alive()
573 self
.connect((host
, port
))
575 # override asynchat version to use a reasonable buffer size
576 def push(self
, data
):
577 self
.producer_fifo
.push(
578 asynchat
.simple_producer(data
, buffer_size
=self
.ac_out_buffer_size
))
582 return "<%s connected to %s:%s>" % (self
.__class
__.__name
__,
583 self
.host
, self
.port
)
586 def set_keep_alive(self
):
587 # try to better notice reboots, net failures, etc
589 self
.socket
.setsockopt(
590 socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
,
591 self
.socket
.getsockopt(socket
.SOL_SOCKET
,
592 socket
.SO_KEEPALIVE
) |
1
598 def _send_command(self
, command
, pickled_argument
):
599 self
.push(command
+ ' ')
600 self
.push("%s\n" % len(pickled_argument
))
601 self
.push(pickled_argument
)
603 def _submit_search(self
):
604 "send a search to client"
606 batch_size
= self
.spectrum_batch_size
608 # start slow, to get chasers going immediately, then increase batch
609 # size to increase master efficiency (and level load)
610 #batch_size = min(self.spectrum_batch_size,
611 # 4**min(self.submit_count-1,10))
613 #if self.spectrum_queue:
614 # # on each retry round, divide batch size by four
615 # batch_size = max(1, (batch_size
616 # / 2**min(64, 2*self.spectrum_queue[0][0])))
618 # get some never-searched spectra, if there are any
619 submit_spectra
= list(itertools
.islice(self
.spectrum_generator
,
622 self
.spectra_in_progress
[(self
.host
, self
.port
)] = (time
.time(),
625 if self
.spectrum_queue
== None:
626 # move spectra_in_progress to spectrum_queue
627 self
.__class
__.spectrum_queue
= []
628 for t
, subsp
in self
.spectra_in_progress
.values():
630 heapq
.heappush(self
.spectrum_queue
, (t
, sp
))
631 self
.__class
__.spectra_in_progress
= None
633 # generate submit_spectra from spectrum_queue
634 while self
.spectrum_queue
and len(submit_spectra
) < batch_size
:
635 _submittime
, spectrum
= heapq
.heappop(self
.spectrum_queue
)
636 # spectrum = (file_index, physical_index, index, name, mass,
638 if spectrum
[2] in self
.searched_spectra
:
639 continue # already have search result
640 heapq
.heappush(self
.spectrum_queue
, (time
.time(), spectrum
))
641 submit_spectra
.append(spectrum
)
644 debug("submitting %s spectra (%s not yet retired)",
645 len(submit_spectra
), self
.spectra_to_go
)
646 self
._send
_command
('spectra', pickle
.dumps(submit_spectra
))
647 self
._send
_command
('search', pickle
.dumps(None))
651 def _receive_response(self
):
652 r
= ''.join(self
.ibuffer
)
657 def handle_connect(self
):
658 debug("connecting to %s:%s", self
.host
, self
.port
)
660 def handle_expt(self
):
663 def handle_error(self
):
665 exc
, why
, _traceback
= sys
.exc_info()
666 self
.connected
= False
667 if exc
== exceptions
.KeyboardInterrupt: # FIX: works?
668 error("received keyboard interrupt")
669 if exc
== socket
.error
:
670 if why
[0] == errno
.ECONNREFUSED
:
671 debug("no chaser at %s:%s (connection refused)", self
.host
,
674 info("network error on connection %s:%s (%s %s)", self
.host
,
676 debug(" traceback: %s", asyncore
.compact_traceback()[3])
678 info("unexpected error on connection %s:%s (%s %s)", self
.host
,
680 info(" traceback: %s", asyncore
.compact_traceback()[3])
683 def handle_close(self
):
685 #self.__class__.dead_clients.append((time.time(), self.host, self.port))
686 self
.dead_clients
.append((time
.time(), self
.host
, self
.port
))
687 asynchat
.async_chat
.handle_close(self
)
690 def collect_incoming_data(self
, data
):
691 self
.ibuffer
.append(data
)
693 def found_terminator(self
):
694 if self
.banner
== None:
695 self
.banner
= self
._receive
_response
()
696 banner_words
= self
.banner
.split()
697 if banner_words
[0] != 'greylag':
698 warning("%s:%s is not a greylag-chase", self
.host
, self
.port
)
701 if banner_words
[2:3] != ['ready']:
702 debug("greylag-chase %s:%s is serving %s", self
.host
,
703 self
.port
, banner_words
[3:4])
706 # now we can start talking
707 self
._send
_command
('parameters', self
.pickled_parameters
)
708 self
._send
_command
('sequences', self
.pickled_sequences
)
709 self
._submit
_search
()
712 if self
.get_terminator() == '\n':
713 reply
= self
._receive
_response
()
714 reply_words
= reply
.split()
715 if reply_words
[0] == 'error':
716 warning("%s:%s gave '%s'", self
.host
, self
.port
, reply
)
720 assert reply_words
[0] == 'found' and len(reply_words
) == 2
721 self
.set_terminator(int(reply_words
[1]))
724 result
= pickle
.loads(self
._receive
_response
())
726 #debug("received: %s" % result)
727 # FIX: check that multiply searched spectra had same results
728 for noise_sp_id
in result
['noise_spectrum_ids']:
729 if noise_sp_id
not in self
.searched_spectra
:
730 self
._remember
_result
(noise_sp_id
, False)
732 for sp_id
, sp_matches
in result
['matches'].iteritems():
733 if sp_id
not in self
.searched_spectra
:
734 self
._remember
_result
(sp_id
, sp_matches
)
736 self
.set_terminator('\n')
737 self
._submit
_search
()
740 def main(args
=sys
.argv
[1:]):
741 parser
= optparse
.OptionParser(usage
=
742 "usage: %prog [options]"
743 " <configuration-file> <ms2-file>...",
744 description
=__doc__
, version
=VERSION
)
745 pa
= parser
.add_option
746 DEFAULT_HOSTFILE
= "/etc/greylag/hosts"
747 pa("--hostfile", dest
="hostfile", default
=DEFAULT_HOSTFILE
,
748 help="file listing host:port locations where greylag-chase workers are"
749 " listening. [default '%s']" % DEFAULT_HOSTFILE
)
750 pa("--job-id", dest
="job_id", default
="unknown",
751 help="used to generate unique output filenames [default 'unknown']")
752 #pa("--estimate", action="store_true", dest="estimate_only",
753 # help="just estimate the time required for the search")
754 pa("-v", "--verbose", action
="store_true", dest
="verbose",
756 pa("-q", "--quiet", action
="store_true", dest
="quiet", help="no warnings")
757 pa("-l", "--logfile", dest
="logfile",
758 help="log to FILE instead of stderr", metavar
="FILE")
759 pa("-P", "--parameter", dest
="parameters", action
="append",
761 help="override a parameter in <configuration-file>, may be used"
762 " multiple times", metavar
="NAME=VALUE")
763 pa("--copyright", action
="store_true", dest
="copyright",
764 help="print copyright and exit")
765 pa("--debug", action
="store_true", dest
="debug",
766 help="output debugging info")
767 pa("--profile", action
="store_true", dest
="profile",
768 help="dump Python profiling output to './greylag-rally.prof.<pid>'")
769 (options
, args
) = parser
.parse_args(args
=args
)
771 if options
.copyright
:
779 configuration_fn
= args
[0]
780 spectrum_fns
= args
[1:]
782 if (any(True for f
in spectrum_fns
if not f
.endswith('.ms2'))
783 or any(True for p
in options
.parameters
if '=' not in p
)):
789 info("starting on %s", socket
.gethostname())
791 # check early that we can open result files
792 result_fn
= 'chase_%s.glw' % options
.job_id
793 result_file
= open(result_fn
, 'wb')
794 temp_result_file
= tempfile
.TemporaryFile(dir='.', prefix
=result_fn
+'-')
796 # read chaser (host, port) listener list
798 with
open(options
.hostfile
) as hostf
:
799 hosts
= [ l
.split(':', 1) for l
in hostf
]
800 hosts
= [ (host
, int(port
)) for host
, port
in hosts
]
802 error("invalid or empty host line in '%s'" % options
.hostfile
)
805 error("no valid search hosts specified")
807 # check spectrum basename uniqueness, as corresponding sqt files will be
808 # in a single directory
809 base_spectrum_fns
= [ os
.path
.basename(fn
) for fn
in spectrum_fns
]
810 if len(base_spectrum_fns
) != len(set(base_spectrum_fns
)):
811 error("base spectrum filenames must be unique")
813 options
.parameters
= [ p
.split('=', 1) for p
in options
.parameters
]
815 # check -P names for validity
816 bad_names
= (set(n
for n
,v
in options
.parameters
)
817 - set(PARAMETER_INFO
.keys()))
819 error("bad -P parameter names %s" % list(bad_names
))
822 cp
= ConfigParser
.RawConfigParser()
823 cp
.optionxform
= str # be case-sensitive
824 with
open(configuration_fn
) as configuration_file
:
825 cp
.readfp(configuration_file
)
826 if not cp
.has_section('greylag'):
827 error("%s has no [greylag] section" % configuration_fn
)
828 parameters
= dict(cp
.items('greylag'))
829 parameters
.update(dict(options
.parameters
)) # command-line override
831 GLP
= validate_parameters(parameters
)
833 # test parse cleavage motif here, so we notice errors early
834 cleavage_motif_re(GLP
["cleavage_motif"])
836 fixed_mod_map
= dict((r
[3], r
) for r
in GLP
["pervasive_mods"])
837 # FIX: is this even necessary?
838 regime_manifest
= initialize_spectrum_parameters(options
, GLP
,
841 info("reading sequence databases")
843 databases
= GLP
["databases"].split()
844 # [(locusname, defline, seq, filename), ...]
845 fasta_db
= list(read_fasta_files(databases
))
847 # This is what we'll send to chasers. Either (1) the already parsed
848 # contents of the databases, which is slow to send, or (2) a list of
849 # (database name, sha1 checksums), which requires that the files exist on
851 db_representation
= ("value", fasta_db
)
852 if 1: # FIX: make this an option?
853 db_representation
= ("name",
854 [ (db
, file_sha1(db
)) for db
in databases
])
856 # FIX: is this worth doing here?
857 #info("checking spectrum files")
858 #check_ms2_files(spectrum_fns)
859 #info("spectrum files okay")
861 spectrum_count
= count_ms2_spectra(spectrum_fns
)
863 if not spectrum_count
:
864 error("no input spectra")
865 info("counted %s spectra", spectrum_count
)
867 # FIX: gather these statistics from chasers? (!)
868 # before/after filtering...
869 # print_spectrum_statistics(spectra)
871 # now connect to chasers and loop until done (or timeout)
872 chase_client
.set_parameters(GLP
)
873 chase_client
.set_sequences(db_representation
)
874 chase_client
.set_spectra(generate_spectra_from_ms2_files(spectrum_fns
),
876 chase_client
.set_result_file(temp_result_file
)
878 # Trivial strategy: Hand each client 1/10th of their likely workload in
879 # each batch. If client crashes, we haven't lost too much work.
880 # Making this value larger keeps polling efficient and lets us handle a
881 # large number of clients. If it's too large, though, we're need more RAM
882 # for temporary storage of pythonized spectra.
883 chase_client
.spectrum_batch_size
= min(100, max(1, int(0.1 * spectrum_count
886 # FIX: this is actually faster (?)
887 #chase_client.spectrum_batch_size = 1
889 # adjust for performance
890 chase_client
.ac_in_buffer_size
= 62000
891 chase_client
.ac_out_buffer_size
= 62000
892 #chase_client.ac_in_buffer_size = 620000
893 #chase_client.ac_out_buffer_size = 620000
895 for host
, port
in hosts
:
896 chase_client(host
, port
)
898 # retry time for dead clients (in seconds)
901 start_time
= time
.time()
904 asyncore
.loop(count
=1, timeout
=retryafter
, use_poll
=True)
905 if chase_client
.spectrum_queue
== []:
910 if now
- laststatus
>= 10:
911 eta_minutes
= ((now
- start_time
)
912 / (spectrum_count
- chase_client
.spectra_to_go
914 * chase_client
.spectra_to_go
/ 60.0)
915 info("%s spectra to search, %s chasers, ETA %dm",
916 chase_client
.spectra_to_go
, len(asyncore
.socket_map
),
920 while (chase_client
.dead_clients
921 and chase_client
.dead_clients
[0][0] < now
- retryafter
):
922 debug("died %s restart %s", chase_client
.dead_clients
[0][0], now
)
923 dt
, host
, port
= heapq
.heappop(chase_client
.dead_clients
)
924 chase_client(host
, port
)
925 if not asyncore
.socket_map
and chase_client
.dead_clients
:
927 time
.sleep(max(0, retryafter
- (now
928 - chase_client
.dead_clients
[0][0])))
929 # failsafe, better performance?
933 # FIX: could close all client sockets here--worth doing?
936 #total_comparisons = sum(ss.candidate_spectrum_count
937 # for ss in score_statistics)
938 #if options.estimate_only:
939 # # divisor is just a slightly informed guess
940 # print ("%.2f generic CPU hours" % (total_comparisons / 439.0e6))
943 info("writing result file '%s'", result_fn
)
945 pk
= pickle
.Pickler(result_file
, pickle
.HIGHEST_PROTOCOL
)
946 pk
.fast
= 1 # stipulate no circular references
947 mod_conjunct_triples
= get_mod_conjunct_triples(GLP
["potential_mods"],
948 GLP
["potential_mod_limit"],
951 pk
.dump({ 'version' : VERSION
,
952 'total comparisons' : 0,
953 'spectrum files' : base_spectrum_fns
,
954 'databases' : databases
,
956 'mass regime atomic masses' : dump_mass_regime_atomic_masses(),
957 'mass regime manifest' : sorted(regime_manifest
),
958 'proton mass' : PROTON_MASS
,
959 'modification conjuncts' : mod_conjunct_triples
,
962 temp_result_file
.flush() # unneeded?
963 for sp_name
, offset
, length
in sorted(chase_client
.result_file_index
):
964 temp_result_file
.seek(offset
)
965 r
= temp_result_file
.read(length
)
966 assert len(r
) == length
969 pk
.dump('complete') # so reader can tell file not truncated
970 result_file
.close() # force any file error to occur now
974 if __name__
== '__main__':
978 # ###psyco.bind(formula_mass)
979 # warning('psyco enabled')
980 # except ImportError:
984 if '--profile' in sys
.argv
:
987 report_fn
= "greylag-rally.prof.%s" % os
.getpid()
988 data_fn
= report_fn
+ ".tmp"
989 prof
= cProfile
.run('main()', data_fn
)
990 with
open(report_fn
, 'w') as report_f
:
992 stats
= pstats
.Stats(data_fn
, stream
=report_f
)
994 stats
.sort_stats('cumulative')
995 stats
.print_stats(50)
996 stats
.sort_stats('time')
997 stats
.print_stats(50)
998 print "# profile report written to '%s'" % report_fn
1009 logging
.exception("unhandled exception")