Merge branch 'slowbisect'
[greylag.git] / greylag_chase.py
blob1278f94c869cc011420cbe9abbe7ebe10e516f36
1 #!/usr/bin/env python
3 '''This program does the actual search of mass spectra against a sequence
4 database. It listens for a connection from greylag-rally, which sends all of
5 the information necessary to do the search, and returns the search results on
6 the same connection. It will continue to process further search requests on
7 the connection, timing out if none are received within the specified interval.
8 Then new connections will be accepted, one at a time. If there are no new
9 connections within the specified interval, the program exits.
11 On startup, the program will attempt to listen on a sequence of ports,
12 depending on the values of --port and --port-count. If these are 12345 and 4,
13 for example, it will try ports 12345, 12346, 12347, and 12348, using the first
14 available. Since only one process is allowed to listen on each port, this
15 functions as a form of locking that will prevent more than --port-count
16 occurrences of this program from running on the host simultaneously.
18 At most, this program accesses the sequence databases through the filesystem.
19 '''
21 from __future__ import with_statement
23 __copyright__ = '''
24 greylag, a collection of programs for MS/MS protein analysis
25 Copyright (C) 2006-2008 Stowers Institute for Medical Research
27 This program is free software: you can redistribute it and/or modify
28 it under the terms of the GNU General Public License as published by
29 the Free Software Foundation, either version 3 of the License, or
30 (at your option) any later version.
32 This program is distributed in the hope that it will be useful,
33 but WITHOUT ANY WARRANTY; without even the implied warranty of
34 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
35 GNU General Public License for more details.
37 You should have received a copy of the GNU General Public License
38 along with this program. If not, see <http://www.gnu.org/licenses/>.
40 Contact: Mike Coleman
41 Stowers Institute for Medical Research
42 1000 East 50th Street
43 Kansas City, Missouri 64110
44 USA
45 '''
48 import cPickle as pickle
49 import errno
50 import logging; from logging import debug, info, warning
51 import optparse
52 import os
53 from pprint import pprint, pformat
54 import re
55 import socket
56 import sys
58 from greylag import *
59 import cgreylag
62 # gc possibly harms performance here, so disable it. gc only matters for
63 # cycles, which we (hope we) don't create. See the gc module docs.
64 import gc; gc.disable()
67 def error(s, *args):
68 "fatal error --> exit with error"
69 # if we're unit testing, just throw an exception
70 if __name__ != "__main__":
71 raise Exception((s + " (fatal error)") % args)
72 logging.error(s, *args)
73 sys.exit(1)
75 class ChaseException(Exception): pass
77 def chase_error(s, *args):
78 "error --> disconnect from client"
79 logging.error(s, *args)
80 raise ChaseException((s + " (disconnecting)") % args)
84 # Try to drop dead immediately on SIGINT (control-C), instead of normal Python
85 # KeyboardInterrupt processing, since we may spend long periods of time in
86 # uninterruptible C++ calls.
87 try:
88 import signal
89 signal.signal(signal.SIGINT, signal.SIG_DFL)
90 #signal.signal(signal.SIGPIPE, signal.SIG_DFL)
91 except:
92 pass
95 # name -> value map of processed greylag config parameters
96 GLP = {}
99 def generate_cleavage_points(cleavage_re, cleavage_pos, sequence):
100 """Yields the offsets of the cleavages in sequence. The endpoints are
101 always included, by convention.
103 >>> list(generate_cleavage_points(re.compile('[KR](?=[^P])'), 1,
104 ... 'ARBCKDEKPF'))
105 [0, 2, 5, 10]
108 yield 0
109 for m in cleavage_re.finditer(sequence):
110 p = m.start() + cleavage_pos
111 if 0 < p < len(sequence):
112 yield p
113 yield len(sequence)
116 # A possible PCA (pyrrolidone carboxyl acid) modification accounts for
117 # circularization of the peptide N-terminal. PCA mods are excluded if a
118 # static N-terminal mod has been specified. Likewise, choosing a PCA mod will
119 # exclude choosing a potential N-terminal mod. (The PCA mod for 'C' is
120 # dependent on a static mod of C+57 being in effect.)
122 def get_pca_table(mass_regimes):
123 """Return a list of tuples (residues, parent delta, fragment delta)
124 describing the PCA possibilities for each mass regime. Residues is a
125 string specifying the residues this PCA delta applies to.
127 # FIX: According to Xtandem, C is only a candidate for PCA if
128 # carboxyamidomethylated (C+57). Currently we always search it.
129 if GLP["enable_pca_mods"]:
130 return [ [('', 0, 0),
131 ('E', -1 * CP.parent_mass_regime[r].water_mass,
132 -1 * CP.fragment_mass_regime[r].water_mass),
133 ('QC', -1 * CP.parent_mass_regime[r].ammonia_mass,
134 -1 * CP.fragment_mass_regime[r].ammonia_mass)]
135 for r in range(len(mass_regimes)) ]
136 return [ [('', 0, 0)] for r in range(len(mass_regimes)) ]
139 def gen_delta_bag_counts(i, remainder, bag):
140 if i < 1:
141 assert i == 0
142 bag[0] = remainder
143 yield tuple(bag)
144 return
145 for delta in range(1, remainder-i+1):
146 bag[i] = delta
147 for x in gen_delta_bag_counts(i-1, remainder-delta, bag):
148 yield x
150 def generate_delta_bag_counts(mod_count, conjunct_length):
151 """Generate all tuples of positive integers having length conjunct_length
152 and sum mod_count. As a special case, () is such a tuple having length 0
153 and sum 0.
155 >>> for i in range(6): print i, list(generate_delta_bag_counts(4, i))
157 0 []
158 1 [(4,)]
159 2 [(3, 1), (2, 2), (1, 3)]
160 3 [(2, 1, 1), (1, 2, 1), (1, 1, 2)]
161 4 [(1, 1, 1, 1)]
162 5 []
163 >>> list(generate_delta_bag_counts(0, 0))
164 [()]
165 >>> list(generate_delta_bag_counts(0, 1))
169 if conjunct_length == 0:
170 return [()] if mod_count == 0 else []
171 if mod_count < conjunct_length:
172 return []
173 return gen_delta_bag_counts(conjunct_length - 1, mod_count,
174 [0] * conjunct_length)
177 def set_context_conjuncts(context, mass_regime_index, R_cj):
178 context.delta_bag_lookup.clear()
179 context.delta_bag_lookup.resize(ord('Z')+1)
180 context.delta_bag_delta.clear()
181 for n, cj in enumerate(R_cj):
182 context.delta_bag_delta.append(cj[6][mass_regime_index][1])
183 for r in cj[3]:
184 context.delta_bag_lookup[ord(r)] \
185 = context.delta_bag_lookup[ord(r)] + (n,)
188 def search_all(context, mod_limit, mod_conjunct_triples, score_statistics):
189 """Search sequence database against searchable spectra."""
191 mass_regimes = GLP["mass_regimes"]
192 pca_table = get_pca_table(mass_regimes)
193 debug("pca_table: %s", pca_table)
195 total_evaluation_count = 0
197 for mod_count in range(mod_limit + 1):
198 context.mod_count = mod_count
199 for mr_index, (mass_regime, pca_entry) in enumerate(zip(mass_regimes,
200 pca_table)):
201 context.mass_regime_index = mr_index
202 for pca_res, pca_parent_delta, pca_frag_delta in pca_entry:
203 context.pca_residues = pca_res
204 context.pca_delta = pca_frag_delta
205 for cji, (N_cj, C_cj, R_cj) in enumerate(mod_conjunct_triples):
206 if pca_res and N_cj:
207 continue # mutually exclusive, for now
208 context.conjunct_index = cji
209 set_context_conjuncts(context, mr_index, R_cj)
210 debug("cj_triple: N=%s C=%s R=%s", N_cj, C_cj, R_cj)
211 for delta_bag in generate_delta_bag_counts(mod_count,
212 len(R_cj)):
213 # this clear() avoids a SWIG/STL bug!?
214 context.delta_bag_count.clear()
215 context.delta_bag_count[:] = delta_bag
217 parent_delta = sum(count*r_cj[6][mr_index][0]
218 for count, r_cj
219 in zip(delta_bag, R_cj))
220 debug("parent_delta: %s", parent_delta)
222 pmrf = CP.parent_mass_regime[mr_index].fixed_residue_mass
223 fmrf = CP.fragment_mass_regime[mr_index].fixed_residue_mass
224 p_fx = (pmrf[ord('[')]
225 + (N_cj[0][6][mr_index][0] if N_cj else 0)
226 + pmrf[ord(']')]
227 + (C_cj[0][6][mr_index][0] if C_cj else 0)
228 + parent_delta + pca_parent_delta + PROTON_MASS)
229 context.parent_fixed_mass = p_fx
230 f_N_fx = (fmrf[ord('[')]
231 + (N_cj[0][6][mr_index][1] if N_cj else 0)
232 + pca_frag_delta)
233 context.fragment_N_fixed_mass = f_N_fx
234 f_C_fx = (fmrf[ord(']')]
235 + (C_cj[0][6][mr_index][1] if C_cj else 0)
236 + CP.fragment_mass_regime[mr_index].water_mass)
237 context.fragment_C_fixed_mass = f_C_fx
239 info("MC=%s MR=%s PCA=%s CJ=%s DB=%s", mod_count,
240 mr_index, pca_res, cji, delta_bag)
241 debug("p_fx %s f_N_fx %s f_C_fx %s", p_fx, f_N_fx,
242 f_C_fx)
244 score_statistics.evaluation_count = 0
245 cgreylag.spectrum.search_runs(context,
246 score_statistics)
247 total_evaluation_count \
248 += score_statistics.evaluation_count
249 info(" %20s evaluations, this bag",
250 score_statistics.evaluation_count)
252 info('%s candidate spectra examined',
253 score_statistics.candidate_spectrum_count)
254 info('%s total evaluations', score_statistics.evaluation_count)
257 AA_SEQUENCE = re.compile(r'[ARNDCQEGHILKMFPSTWYV]+')
259 def split_sequence_into_aa_runs(idno, locusname, defline, sequence, filename):
260 """Returns a tuple (idno, start, locusname, defline, seq, filename) for
261 each contiguous run of residues in sequence, where 'start' is the position
262 of 'seq' in 'sequence'.
264 >>> pprint(split_sequence_into_aa_runs(123, 'ln', 'ln defline',
265 ... 'STSS*DEFABA', 'filename'))
266 [(123, 0, 'ln', 'ln defline', 'STSS', 'filename'),
267 (123, 5, 'ln', 'ln defline', 'DEFA', 'filename'),
268 (123, 10, 'ln', 'ln defline', 'A', 'filename')]
271 return [ (idno, m.start(), locusname, defline, m.group(), filename)
272 for n, m in enumerate(AA_SEQUENCE.finditer(sequence)) ]
275 def fix_up_flanking_residues(fasta_db, best_matches):
276 """For each peptide match, choose one of the match loci and update the
277 N_peptide_flank and C_peptide_flank residues according to the flanking
278 residues at that locus. If there are multiple loci, choose the most
279 tryptic. (Use '-' if there is no flanking residue or it is unknown.)
282 locus_sequence_by_name = dict((locusname, seq)
283 for (locusname, defline, seq, filename)
284 in fasta_db)
286 def check_residue(r):
287 return r if r in RESIDUES else '-'
289 def count_tryptic_pairs(l):
290 return sum(1 for N,C in l if N in ('K', 'R', '-') and C not in ('P',))
292 for spectrum_best_matches in best_matches:
293 for m in spectrum_best_matches:
294 if not m['peptide_sequence']:
295 continue
296 N_residue = check_residue(m['peptide_sequence'][0])
297 C_residue = check_residue(m['peptide_sequence'][-1])
298 candidate_flanks = []
299 for locus_index in range(len(m['sequence_name'])):
300 l_sequence_name = m['sequence_name'][locus_index]
301 l_peptide_begin = m['peptide_begin'][locus_index]
303 locus_sequence = locus_sequence_by_name[l_sequence_name]
304 N_flank = (locus_sequence[l_peptide_begin-1]
305 if l_peptide_begin > 0 else '-')
306 peptide_end = l_peptide_begin+len(m['peptide_sequence'])
307 C_flank = (locus_sequence[peptide_end]
308 if peptide_end < len(locus_sequence) else '-')
309 candidate_flanks.append([(N_flank, N_residue),
310 (C_residue, C_flank)])
312 best_candidate = max(candidate_flanks, key=count_tryptic_pairs)
313 m['N_peptide_flank'] = best_candidate[0][0]
314 m['C_peptide_flank'] = best_candidate[-1][-1]
317 def pythonize_swig_object(o, only_fields=None, skip_fields=[]):
318 """Creates a pure Python copy of a SWIG object, so that it can be easily
319 pickled, or printed (for debugging purposes). Each SWIG object is
320 pythonized as a dictionary, for convenience. If provided, 'only_fields',
321 limits the copy to the list of fields specified. Otherwise,
322 'skip_fields' if given is a list of methods not to include (this helps
323 avoid infinite recursion). Callable sub-objects are skipped.
325 >>> pprint(pythonize_swig_object(cgreylag.score_stats(1, 1)))
326 {'best_matches': [[{'C_peptide_flank': '-',
327 'N_peptide_flank': '-',
328 'conjunct_index': -1,
329 'mass_regime_index': -1,
330 'mass_trace': [],
331 'pca_delta': 0.0,
332 'peptide_begin': [],
333 'peptide_sequence': '',
334 'predicted_parent_mass': 0.0,
335 'score': 0.0,
336 'sequence_name': [],
337 'spectrum_index': -1}]],
338 'candidate_spectrum_count': 0,
339 'evaluation_count': 0}
343 if isinstance(o, str):
344 return o
345 try:
346 len(o)
347 except TypeError:
348 pass
349 else:
350 return list(pythonize_swig_object(x, only_fields, skip_fields)
351 for x in o)
352 if hasattr(o, '__swig_getmethods__'):
353 s = {}
354 for a in o.__swig_getmethods__:
355 if (not callable(getattr(o, a))
356 and (only_fields != None and a in only_fields
357 or only_fields == None and a not in skip_fields)):
358 s[a] = pythonize_swig_object(getattr(o, a), only_fields,
359 skip_fields)
360 return s
361 return o
364 def results_dump(fasta_db, score_statistics, searchable_spectra):
365 """Return a result dict mapping spectrum names to (spectrum_metadata,
366 best_matches) pairs.
368 Unneeded fields are stripped. Some are always removed, while others are
369 removed if they have a value matching their default (0 or []).
370 Placeholders in the match list (which do not represent actual matches) are
371 also removed.
374 # The goal of stripping is not so much to reduce the size of the glw
375 # files--though it does do this--as to reduce the memory footprint of
376 # loading a merged glw file representing an entire run. It also probably
377 # speeds things up a bit.
379 r = {}
380 spectrum_metadata_fs = set(['id', 'name', 'file_id', 'mass', 'charge',
381 'total_ion_current', 'comparisons'])
382 py_s_spectra = pythonize_swig_object(searchable_spectra,
383 only_fields=spectrum_metadata_fs)
384 py_matches = pythonize_swig_object(score_statistics.best_matches,
385 skip_fields=['spectrum_index'])
386 assert len(py_s_spectra) == len(py_matches)
388 # FIX: doing this here because it's much easier after pythonizing
389 fix_up_flanking_residues(fasta_db, py_matches)
391 # note that both strip functions modify their argument
392 def strip_meta(meta):
393 # these fields are redundant, could strip them
394 #del meta['file_id']
395 #del meta['name']
396 #del meta['charge']
397 return meta
399 def strip_match(match):
400 assert len(match['peptide_begin']) == len(match['sequence_name'])
401 if len(match['peptide_begin']) == 0:
402 return None
403 if match['conjunct_index'] == 0:
404 del match['conjunct_index']
405 if match['mass_regime_index'] == 0:
406 del match['mass_regime_index']
407 if len(match['mass_trace']) == 0:
408 del match['mass_trace']
409 if match['pca_delta'] == 0:
410 del match['pca_delta']
411 return match
413 for sp_metadata, sp_matches in zip(py_s_spectra, py_matches):
414 sp_key = sp_metadata['id']
415 assert sp_key not in r, "duplicate spectrum id"
416 sp_matches_stripped = [ strip_match(m) for m in sp_matches ]
417 sp_matches_stripped = [ m for m in sp_matches_stripped if m != None ]
418 r[sp_key] = (strip_meta(sp_metadata), sp_matches_stripped)
420 return r
423 def set_parameters(arg, options):
424 global GLP
425 GLP = arg
426 fixed_mod_map = dict((r[3], r) for r in GLP["pervasive_mods"])
427 regime_manifest = initialize_spectrum_parameters(options, GLP,
428 GLP["mass_regimes"],
429 fixed_mod_map)
430 mod_conjunct_triples = get_mod_conjunct_triples(GLP["potential_mods"],
431 GLP["potential_mod_limit"],
432 GLP["mass_regimes"])
433 if len(mod_conjunct_triples) >= sys.maxint-8:
434 chase_error("too many conjunct triples")
435 GLP[">mod_conjunct_triples"] = mod_conjunct_triples
437 info("%s unique potential modification conjuncts",
438 len(mod_conjunct_triples))
439 debug("mod_conjunct_triples (unique):\n%s",
440 pformat(mod_conjunct_triples))
442 # (cleavage_re, position of cleavage in cleavage_re)
443 cleavage_pattern, cleavage_position \
444 = cleavage_motif_re(GLP["cleavage_motif"])
445 if cleavage_position == None:
446 chase_error("cleavage site '%s' is missing '|'",
447 GLP["protein, cleavage site"])
448 cleavage_pattern = re.compile(cleavage_pattern)
449 GLP[">cleavage_pattern"] = cleavage_pattern
450 GLP[">cleavage_position"] = cleavage_position
453 def set_sequences(arg):
454 assert arg[0] in ('name', 'value')
455 if arg[0] == 'name':
456 checked = [ (db, file_sha1(db)) for db, cksum in arg[1] ]
457 if checked != arg[1]:
458 chase_error("database checksum does not match [%s]",
459 (checked, arg[1]))
460 # [(locusname, defline, seq, filename), ...]
461 fasta_db = list(read_fasta_files([db for db, cksum in arg[1]]))
462 else:
463 fasta_db = arg[1]
464 # [(idno, offset, locusname, defline, seq, seq_filename), ...]
465 db = []
466 for idno, (locusname, defline, sequence, filename) in enumerate(fasta_db):
467 db.extend(split_sequence_into_aa_runs(idno, locusname, defline,
468 sequence, filename))
469 db_residue_count = sum(len(dbi[3]) for dbi in db)
471 info("read %s sequences (%s runs, %s residues)", len(fasta_db), len(db),
472 db_residue_count)
473 max_run_length = max(len(r[3]) for r in db)
474 info("max run length is %s residues", max_run_length)
475 if max_run_length > sys.maxint:
476 chase_error("runs longer than %s not yet supported", max_run_length)
477 if not db:
478 chase_error("no database sequences")
480 context = cgreylag.search_context()
481 context.nonspecific_cleavage = (GLP["cleavage_motif"] == "[X]|[X]")
482 for idno, offset, locusname, defline, seq, seq_filename in db:
483 cp = []
484 if not context.nonspecific_cleavage:
485 cp = list(generate_cleavage_points(GLP[">cleavage_pattern"],
486 GLP[">cleavage_position"], seq))
487 sr = cgreylag.sequence_run(idno, offset, seq, cp, locusname)
488 context.sequence_runs.append(sr)
489 context.maximum_missed_cleavage_sites = GLP["maximum_missed_cleavage_sites"]
490 return context, fasta_db
493 def make_swig_spectrum(py_spectrum):
494 """Given a spectrum tuple (file_index, physical_index, index, name, mass,
495 charge, peaks), return an equivalent SWIG spectrum object.
497 sp = cgreylag.spectrum(float(py_spectrum[4]), int(py_spectrum[5]))
498 sp.file_id, sp.physical_id, sp.id, sp.name = py_spectrum[:4]
500 peaks_as_string = py_spectrum[6]
501 assert peaks_as_string[-1] == '\n', "malformed spectrum file"
502 # ( (mz0, intensity0), (mz1, intensity1), ... )
503 peaks = tuple(tuple(float(v) for v in pkline.split(None, 1))
504 for pkline in peaks_as_string.split('\n')[:-1])
505 #debug("py_spectrum: %s" % (py_spectrum,))
506 #debug("peaks: %s" % (peaks,))
507 sp.set_peaks_from_matrix(peaks)
508 return sp
511 def set_spectra(arg):
512 # [ (file_index, physical_index, index, name, mass, charge, peaks), ... ]
513 py_spectra = arg
515 # FIX: how to handle input errors???
517 ### create SWIG spectrum objects, for C++ access
518 # FIX: are physical_index and index even needed?
519 debug("parsing %s spectra", len(py_spectra))
520 spectra = [ make_swig_spectrum(sp) for sp in py_spectra ]
521 debug("parsed")
522 spectra.sort(key=lambda x: x.mass)
524 # FIX: send stats back to master? move common code?
525 def peak_statistics(spectra):
526 counts = [ len(sp.peaks) for sp in spectra ]
527 counts.sort()
528 n = len(counts)
529 return (counts[0], counts[int(n*0.25)], counts[int(n*0.5)],
530 counts[int(n*0.75)], counts[-1], sum(counts) / float(n))
532 def print_spectrum_statistics(spectra):
533 info(" %s spectra (mass range %s - %s)", len(spectra),
534 spectra[0].mass, spectra[-1].mass)
535 info(" peak stats: %s..%s..%s..%s..%s (mean=%.2f)"
536 % peak_statistics(spectra))
538 if spectra:
539 info("read spectra:")
540 print_spectrum_statistics(spectra)
541 else:
542 warning("no input spectra")
544 # filter and normalize spectra
545 for sp in spectra:
546 sp.filter_peaks(GLP["TIC_cutoff_proportion"],
547 CP.parent_mass_tolerance_max)
548 sp.classify(GLP["intensity_class_count"], GLP["intensity_class_ratio"],
549 GLP["fragment_mass_tolerance"])
551 min_psm = GLP["min_parent_spectrum_mass"]
552 max_psm = GLP["max_parent_spectrum_mass"]
554 # search only spectra that are of good enough quality
555 # FIX: also filter by 1 + 2 + 4 rule?
556 good_spectra = [ sp for sp in spectra
557 if (len(sp.peaks) >= 10
558 and min_psm <= sp.mass <= max_psm) ]
559 noise_spectrums_ids = (set(sp.id for sp in spectra)
560 - set(sp.id for sp in good_spectra))
562 if good_spectra:
563 info("after filtering:")
564 print_spectrum_statistics(good_spectra)
565 else:
566 info("no spectra pass filters")
568 cgreylag.spectrum.set_searchable_spectra(good_spectra)
569 return noise_spectrums_ids
572 def perform_search(state):
574 results = { 'modification conjuncts' : GLP[">mod_conjunct_triples"],
575 'argv' : sys.argv,
576 'noise_spectrum_ids' : state['noise_spectrum_ids'],
577 'matches' : {}, 'total comparisons' : 0 }
579 spectra_count = len(cgreylag.cvar.spectrum_searchable_spectra)
581 if not spectra_count:
582 info("no spectra after filtering--search skipped")
583 return results
585 score_statistics = cgreylag.score_stats(spectra_count,
586 GLP["best_result_count"])
588 info("searching")
589 search_all(state['context'], GLP["potential_mod_limit"],
590 GLP[">mod_conjunct_triples"], score_statistics)
592 info("returning result")
593 results['matches'] = results_dump(state['fasta_db'], score_statistics,
594 cgreylag.cvar.spectrum_searchable_spectra)
595 results['total comparisons'] = score_statistics.candidate_spectrum_count
597 #debug("sending search results:\n%s" % pformat(results))
599 # if options.estimate_only:
600 # # this factor is just an empirical guess
601 # print ("%.2f generic CPU hours"
602 # % (score_statistics.candidate_spectrum_count / 439.0e6))
603 # return
605 return results
609 # inspired by Beazley's generator talk
610 def listen_for_connections(port, port_count):
611 """Listen on a port from the range [port, port+port_count) and yield a
612 series of connections."""
613 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
614 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
616 for p in range(port, port+port_count):
617 try:
618 s.bind(('', p)) # socket.INADDR_ANY?
619 s.listen(5)
620 info("listening on port %s", p)
621 break
622 except socket.error, e:
623 if e[0] == errno.EADDRINUSE:
624 continue
625 raise
626 else:
627 error("could not listen, all specified ports in use")
629 while True:
630 client_socket, client_addr = s.accept()
631 info("received connection from %s", client_addr)
633 # try to better notice reboots, net failures, etc
634 try:
635 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,
636 s.getsockopt(socket.SOL_SOCKET,
637 socket.SO_KEEPALIVE) | 1)
638 except socket.error:
639 pass
641 yield client_socket
644 # FIX: give cleaner diagnostics on protocol failure?
647 def reset_state(state):
648 state.clear()
649 cgreylag.spectrum.set_searchable_spectra([])
652 def handle_command(options, state, command, arg):
653 """Handle command/arg from client, returning (response, response_arg) or
654 None. State that persists across commands is stored in 'state', or on the
655 C++ side with cgreylag.spectrum.set_searchable_spectra().
657 if command == 'parameters':
658 reset_state(state)
659 set_parameters(arg, options)
660 return None
661 elif command == 'sequences':
662 context, fasta_db = set_sequences(arg)
663 state['context'] = context
664 state['fasta_db'] = fasta_db
665 return None
666 elif command == 'spectra':
667 state['noise_spectrum_ids'] = set_spectra(arg)
668 return None
669 elif command == 'search':
670 #currently ignore arg
671 results = perform_search(state)
672 return 'found', results
673 else:
674 assert False, "unknown command '%s'" % command
677 def main(args=sys.argv[1:]):
678 parser = optparse.OptionParser(usage=
679 "usage: %prog [options]",
680 description=__doc__, version=VERSION)
681 pa = parser.add_option
682 DEFAULT_PORT = 10078
683 pa("--port", dest="port", type="int", default=DEFAULT_PORT,
684 help="first listener port [default=%s]" % DEFAULT_PORT)
685 DEFAULT_PORT_COUNT = 4
686 pa("--port-count", dest="port_count", type="int",
687 default=DEFAULT_PORT_COUNT,
688 help="number of ports to try [default=%s]" % DEFAULT_PORT_COUNT)
689 #DEFAULT_TIMEOUT = 600
690 #pa("--timeout", dest="timeout", type="int",
691 # default=DEFAULT_TIMEOUT,
692 # help="inactivity timeout [default=%ss]" % DEFAULT_TIMEOUT)
693 pa("-q", "--quiet", action="store_true", dest="quiet", help="no warnings")
694 pa("-p", "--show-progress", action="store_true", dest="show_progress",
695 help="show running progress")
696 pa("-v", "--verbose", action="store_true", dest="verbose",
697 help="be verbose")
698 pa("-l", "--logfile", dest="logfile",
699 help="log to FILE instead of stderr", metavar="FILE")
700 pa("--copyright", action="store_true", dest="copyright",
701 help="print copyright and exit")
702 pa("--debug", action="store_true", dest="debug",
703 help="output debugging info")
704 pa("--profile", action="store_true", dest="profile",
705 help="dump Python profiling output to './greylag-chase.prof.<pid>'")
706 (options, args) = parser.parse_args(args=args)
708 if options.copyright:
709 print __copyright__
710 sys.exit(0)
712 if len(args) > 0:
713 parser.print_help()
714 sys.exit(1)
716 set_logging(options)
718 info("starting on %s", socket.gethostname())
720 state = {}
721 for client in listen_for_connections(options.port, options.port_count):
722 try:
723 client_f_to = client.makefile('w')
724 client_f_from = client.makefile('r')
726 print >> client_f_to, "greylag %s ready" % VERSION
727 client_f_to.flush()
729 # new client--clear all prior state
730 reset_state(state)
732 while True:
733 command_line = client_f_from.readline()
734 if not command_line:
735 info("client closed connection")
736 break
737 command, arg_length = command_line.split()
738 arg = pickle.loads(client_f_from.read(int(arg_length)))
739 try:
740 r = handle_command(options, state, command, arg)
741 except ChaseException, e:
742 r = ('error', e)
743 if not r:
744 continue
745 response, response_arg = r
746 if response == 'error':
747 print >> client_f_to, 'error', response_arg
748 break
749 assert response == 'found'
750 p_response_arg = pickle.dumps(response_arg)
751 print >> client_f_to, 'found', len(p_response_arg)
752 client_f_to.write(p_response_arg)
753 client_f_to.flush()
754 except socket.error, e:
755 info("closing connection on error [%s]", e)
756 except Exception, e:
757 try: print >> client_f_to, ('error [%s "%s"]'
758 % (sys.exc_info()[0], e))
759 except: pass
760 raise
761 finally:
762 try: client_f_to.close()
763 except: pass
764 try: client_f_from.close()
765 except: pass
766 try: client.close()
767 except: pass
769 info("exiting")
772 if __name__ == '__main__':
773 # try:
774 # import psyco
775 # psyco.full()
776 # ###psyco.bind(formula_mass)
777 # warning('psyco enabled')
778 # except ImportError:
779 # pass
781 try:
782 if '--profile' in sys.argv:
783 import cProfile
784 import pstats
785 report_fn = "greylag-chase.prof.%s" % os.getpid()
786 data_fn = report_fn + ".tmp"
787 prof = cProfile.run('main()', data_fn)
788 with open(report_fn, 'w') as report_f:
789 try:
790 stats = pstats.Stats(data_fn, stream=report_f)
791 stats.strip_dirs()
792 stats.sort_stats('cumulative')
793 stats.print_stats(50)
794 stats.sort_stats('time')
795 stats.print_stats(50)
796 print "# profile report written to '%s'" % report_fn
797 finally:
798 try:
799 os.remove(data_fn)
800 except:
801 pass
802 else:
803 main()
804 except SystemExit:
805 raise
806 except:
807 logging.exception("unhandled exception")
808 sys.exit(1)
809 finally:
810 logging.shutdown()