shuffle: use common fasta reader, upcase sequences, code cleanups
[greylag.git] / greylag_chase.py
blob92b722b05760fbc708def0eade3a9cb4bbd577d4
1 #!/usr/bin/env python
3 '''This program does the actual search of mass spectra against a sequence
4 database. It listens for a connection from greylag-rally, which sends all of
5 the information necessary to do the search, and returns the search results on
6 the same connection. It will continue to process further search requests on
7 the connection, timing out if none are received within the specified interval.
8 Then new connections will be accepted, one at a time. If there are no new
9 connections within the specified interval, the program exits.
11 On startup, the program will attempt to listen on a sequence of ports,
12 depending on the values of --port and --port-count. If these are 12345 and 4,
13 for example, it will try ports 12345, 12346, 12347, and 12348, using the first
14 available. Since only one process is allowed to listen on each port, this
15 functions as a form of locking that will prevent more than --port-count
16 occurrences of this program from running on the host simultaneously.
18 At most, this program accesses the sequence databases through the filesystem.
19 '''
21 from __future__ import with_statement
23 __copyright__ = '''
24 greylag, a collection of programs for MS/MS protein analysis
25 Copyright (C) 2006-2008 Stowers Institute for Medical Research
27 This program is free software: you can redistribute it and/or modify
28 it under the terms of the GNU General Public License as published by
29 the Free Software Foundation, either version 3 of the License, or
30 (at your option) any later version.
32 This program is distributed in the hope that it will be useful,
33 but WITHOUT ANY WARRANTY; without even the implied warranty of
34 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
35 GNU General Public License for more details.
37 You should have received a copy of the GNU General Public License
38 along with this program. If not, see <http://www.gnu.org/licenses/>.
40 Contact: Mike Coleman
41 Stowers Institute for Medical Research
42 1000 East 50th Street
43 Kansas City, Missouri 64110
44 USA
45 '''
48 import cPickle as pickle
49 import errno
50 import logging; from logging import debug, info, warning
51 import optparse
52 import os
53 from pprint import pprint, pformat
54 import re
55 import socket
56 import sys
58 from greylag import *
59 import cgreylag
62 # gc possibly harms performance here, so disable it. gc only matters for
63 # cycles, which we (hope we) don't create. See the gc module docs.
64 import gc; gc.disable()
67 def error(s, *args):
68 "fatal error --> exit with error"
69 # if we're unit testing, just throw an exception
70 if __name__ != "__main__":
71 raise Exception((s + " (fatal error)") % args)
72 logging.error(s, *args)
73 sys.exit(1)
75 class ChaseException(Exception): pass
77 def chase_error(s, *args):
78 "error --> disconnect from client"
79 logging.error(s, *args)
80 raise ChaseException((s + " (disconnecting)") % args)
84 # Try to drop dead immediately on SIGINT (control-C), instead of normal Python
85 # KeyboardInterrupt processing, since we may spend long periods of time in
86 # uninterruptible C++ calls.
87 try:
88 import signal
89 signal.signal(signal.SIGINT, signal.SIG_DFL)
90 #signal.signal(signal.SIGPIPE, signal.SIG_DFL)
91 except:
92 pass
95 # name -> value map of processed greylag config parameters
96 GLP = {}
99 def generate_cleavage_points(cleavage_re, cleavage_pos, sequence):
100 """Yields the offsets of the cleavages in sequence. The endpoints are
101 always included, by convention.
103 >>> list(generate_cleavage_points(re.compile('[KR](?=[^P])'), 1,
104 ... 'ARBCKDEKPF'))
105 [0, 2, 5, 10]
108 yield 0
109 for m in cleavage_re.finditer(sequence):
110 p = m.start() + cleavage_pos
111 if 0 < p < len(sequence):
112 yield p
113 yield len(sequence)
116 # A possible PCA (pyrrolidone carboxyl acid) modification accounts for
117 # circularization of the peptide N-terminal. PCA mods are excluded if a
118 # static N-terminal mod has been specified. Likewise, choosing a PCA mod will
119 # exclude choosing a potential N-terminal mod. (The PCA mod for 'C' is
120 # dependent on a static mod of C+57 being in effect.)
122 def get_pca_table(mass_regimes):
123 """Return a list of tuples (residues, parent delta, fragment delta)
124 describing the PCA possibilities for each mass regime. Residues is a
125 string specifying the residues this PCA delta applies to.
127 # FIX: According to Xtandem, C is only a candidate for PCA if
128 # carboxyamidomethylated (C+57). Currently we always search it.
129 if GLP["enable_pca_mods"]:
130 return [ [('', 0, 0),
131 ('E', -1 * CP.parent_mass_regime[r].water_mass,
132 -1 * CP.fragment_mass_regime[r].water_mass),
133 ('QC', -1 * CP.parent_mass_regime[r].ammonia_mass,
134 -1 * CP.fragment_mass_regime[r].ammonia_mass)]
135 for r in range(len(mass_regimes)) ]
136 return [ [('', 0, 0)] for r in range(len(mass_regimes)) ]
139 def gen_delta_bag_counts(i, remainder, bag):
140 if i < 1:
141 assert i == 0
142 bag[0] = remainder
143 yield tuple(bag)
144 return
145 for delta in range(1, remainder-i+1):
146 bag[i] = delta
147 for x in gen_delta_bag_counts(i-1, remainder-delta, bag):
148 yield x
150 def generate_delta_bag_counts(mod_count, conjunct_length):
151 """Generate all tuples of positive integers having length conjunct_length
152 and sum mod_count. As a special case, () is such a tuple having length 0
153 and sum 0.
155 >>> for i in range(6): print i, list(generate_delta_bag_counts(4, i))
157 0 []
158 1 [(4,)]
159 2 [(3, 1), (2, 2), (1, 3)]
160 3 [(2, 1, 1), (1, 2, 1), (1, 1, 2)]
161 4 [(1, 1, 1, 1)]
162 5 []
163 >>> list(generate_delta_bag_counts(0, 0))
164 [()]
165 >>> list(generate_delta_bag_counts(0, 1))
169 if conjunct_length == 0:
170 return [()] if mod_count == 0 else []
171 if mod_count < conjunct_length:
172 return []
173 return gen_delta_bag_counts(conjunct_length - 1, mod_count,
174 [0] * conjunct_length)
177 def set_context_conjuncts(context, mass_regime_index, R_cj):
178 context.delta_bag_lookup.clear()
179 context.delta_bag_lookup.resize(ord('Z')+1)
180 context.delta_bag_delta.clear()
181 for n, cj in enumerate(R_cj):
182 context.delta_bag_delta.append(cj[6][mass_regime_index][1])
183 for r in cj[3]:
184 context.delta_bag_lookup[ord(r)] \
185 = context.delta_bag_lookup[ord(r)] + (n,)
188 def search_all(context, mod_limit, mod_conjunct_triples, score_statistics):
189 """Search sequence database against searchable spectra."""
191 mass_regimes = GLP["mass_regimes"]
192 pca_table = get_pca_table(mass_regimes)
193 debug("pca_table: %s", pca_table)
195 total_evaluation_count = 0
197 for mod_count in range(mod_limit + 1):
198 context.mod_count = mod_count
199 for mr_index, (mass_regime, pca_entry) in enumerate(zip(mass_regimes,
200 pca_table)):
201 context.mass_regime_index = mr_index
202 for pca_res, pca_parent_delta, pca_frag_delta in pca_entry:
203 context.pca_residues = pca_res
204 context.pca_delta = pca_frag_delta
205 for cji, (N_cj, C_cj, R_cj) in enumerate(mod_conjunct_triples):
206 if pca_res and N_cj:
207 continue # mutually exclusive, for now
208 context.conjunct_index = cji
209 set_context_conjuncts(context, mr_index, R_cj)
210 debug("cj_triple: N=%s C=%s R=%s", N_cj, C_cj, R_cj)
211 for delta_bag in generate_delta_bag_counts(mod_count,
212 len(R_cj)):
213 # this clear() avoids a SWIG/STL bug!?
214 context.delta_bag_count.clear()
215 context.delta_bag_count[:] = delta_bag
217 parent_delta = sum(count*r_cj[6][mr_index][0]
218 for count, r_cj
219 in zip(delta_bag, R_cj))
220 debug("parent_delta: %s", parent_delta)
222 pmrf = CP.parent_mass_regime[mr_index].fixed_residue_mass
223 fmrf = CP.fragment_mass_regime[mr_index].fixed_residue_mass
224 p_fx = (pmrf[ord('[')]
225 + (N_cj[0][6][mr_index][0] if N_cj else 0)
226 + pmrf[ord(']')]
227 + (C_cj[0][6][mr_index][0] if C_cj else 0)
228 + parent_delta + pca_parent_delta + PROTON_MASS)
229 context.parent_fixed_mass = p_fx
230 f_N_fx = (fmrf[ord('[')]
231 + (N_cj[0][6][mr_index][1] if N_cj else 0)
232 + pca_frag_delta)
233 context.fragment_N_fixed_mass = f_N_fx
234 f_C_fx = (fmrf[ord(']')]
235 + (C_cj[0][6][mr_index][1] if C_cj else 0)
236 + CP.fragment_mass_regime[mr_index].water_mass)
237 context.fragment_C_fixed_mass = f_C_fx
239 info("MC=%s MR=%s PCA=%s CJ=%s DB=%s", mod_count,
240 mr_index, pca_res, cji, delta_bag)
241 debug("p_fx %s f_N_fx %s f_C_fx %s", p_fx, f_N_fx,
242 f_C_fx)
244 score_statistics.evaluation_count = 0
245 cgreylag.spectrum.search_runs(context,
246 score_statistics)
247 total_evaluation_count \
248 += score_statistics.evaluation_count
249 info(" %20s evaluations, this bag",
250 score_statistics.evaluation_count)
252 info('%s candidate spectra examined',
253 score_statistics.candidate_spectrum_count)
254 info('%s total evaluations', score_statistics.evaluation_count)
257 AA_SEQUENCE = re.compile(r'[ARNDCQEGHILKMFPSTWYV]+')
259 def split_sequence_into_aa_runs(idno, locusname, defline, sequence, filename):
260 """Returns a tuple (idno, start, locusname, defline, seq, filename) for
261 each contiguous run of residues in sequence, where 'start' is the position
262 of 'seq' in 'sequence'.
264 >>> pprint(split_sequence_into_aa_runs(123, 'ln', 'ln defline',
265 ... 'STSS*DEFABA', 'filename'))
266 [(123, 0, 'ln', 'ln defline', 'STSS', 'filename'),
267 (123, 5, 'ln', 'ln defline', 'DEFA', 'filename'),
268 (123, 10, 'ln', 'ln defline', 'A', 'filename')]
271 return [ (idno, m.start(), locusname, defline, m.group(), filename)
272 for n, m in enumerate(AA_SEQUENCE.finditer(sequence)) ]
275 def fix_up_flanking_residues(fasta_db, best_matches):
276 """For each peptide match, choose one of the match loci and update the
277 N_peptide_flank and C_peptide_flank residues according to the flanking
278 residues at that locus. If there are multiple loci, choose the most
279 tryptic. (Use '-' if there is no flanking residue or it is unknown.)
282 locus_sequence_by_name = dict((locusname, seq)
283 for (locusname, defline, seq, filename)
284 in fasta_db)
286 def check_residue(r):
287 return r if r in RESIDUES else '-'
289 def count_tryptic_pairs(l):
290 return sum(1 for N,C in l if N in ('K', 'R', '-') and C not in ('P',))
292 for spectrum_best_matches in best_matches:
293 for m in spectrum_best_matches:
294 if not m['peptide_sequence']:
295 continue
296 N_residue = check_residue(m['peptide_sequence'][0])
297 C_residue = check_residue(m['peptide_sequence'][-1])
298 candidate_flanks = []
299 for locus_index in range(len(m['sequence_name'])):
300 l_sequence_name = m['sequence_name'][locus_index]
301 l_peptide_begin = m['peptide_begin'][locus_index]
303 locus_sequence = locus_sequence_by_name[l_sequence_name]
304 N_flank = (locus_sequence[l_peptide_begin-1]
305 if l_peptide_begin > 0 else '-')
306 peptide_end = l_peptide_begin+len(m['peptide_sequence'])
307 C_flank = (locus_sequence[peptide_end]
308 if peptide_end < len(locus_sequence) else '-')
309 candidate_flanks.append([(N_flank, N_residue),
310 (C_residue, C_flank)])
312 best_candidate = max(candidate_flanks, key=count_tryptic_pairs)
313 m['N_peptide_flank'] = best_candidate[0][0]
314 m['C_peptide_flank'] = best_candidate[-1][-1]
317 def pythonize_swig_object(o, only_fields=None, skip_fields=[]):
318 """Creates a pure Python copy of a SWIG object, so that it can be easily
319 pickled, or printed (for debugging purposes). Each SWIG object is
320 pythonized as a dictionary, for convenience. If provided, 'only_fields',
321 limits the copy to the list of fields specified. Otherwise,
322 'skip_fields' if given is a list of methods not to include (this helps
323 avoid infinite recursion). Callable sub-objects are skipped.
325 >>> pprint(pythonize_swig_object(cgreylag.score_stats(1, 1)))
326 {'best_matches': [[{'C_peptide_flank': '-',
327 'N_peptide_flank': '-',
328 'conjunct_index': -1,
329 'mass_regime_index': -1,
330 'mass_trace': [],
331 'pca_delta': 0.0,
332 'peptide_begin': [],
333 'peptide_sequence': '',
334 'predicted_parent_mass': 0.0,
335 'score': 0.0,
336 'sequence_name': [],
337 'spectrum_index': -1}]],
338 'candidate_spectrum_count': 0,
339 'evaluation_count': 0}
343 if isinstance(o, str):
344 return o
345 try:
346 len(o)
347 except TypeError:
348 pass
349 else:
350 return list(pythonize_swig_object(x, only_fields, skip_fields)
351 for x in o)
352 if hasattr(o, '__swig_getmethods__'):
353 s = {}
354 for a in o.__swig_getmethods__:
355 if (not callable(getattr(o, a))
356 and (only_fields != None and a in only_fields
357 or only_fields == None and a not in skip_fields)):
358 s[a] = pythonize_swig_object(getattr(o, a), only_fields,
359 skip_fields)
360 return s
361 return o
364 def results_dump(fasta_db, score_statistics, searchable_spectra):
365 """Return a result dict mapping spectrum ids to (spectrum_metadata,
366 best_matches) pairs.
368 Unneeded fields are stripped. Some are always removed, while others are
369 removed if they have a value matching their default (0 or []).
370 Placeholders in the match list (which do not represent actual matches) are
371 also removed.
374 # The goal of stripping is not so much to reduce the size of the glw
375 # files--though it does do this--as to reduce the memory footprint of
376 # loading a merged glw file representing an entire run. It also probably
377 # speeds things up a bit.
379 r = {}
380 spectrum_metadata_fs = set(['id', 'name', 'file_id', 'mass', 'charge',
381 'total_ion_current', 'comparisons'])
382 py_s_spectra = pythonize_swig_object(searchable_spectra,
383 only_fields=spectrum_metadata_fs)
384 py_matches = pythonize_swig_object(score_statistics.best_matches,
385 skip_fields=['spectrum_index'])
386 assert len(py_s_spectra) == len(py_matches)
388 # FIX: doing this here because it's much easier after pythonizing
389 fix_up_flanking_residues(fasta_db, py_matches)
391 # note that this function modifies its argument
392 def strip_match(match):
393 assert len(match['peptide_begin']) == len(match['sequence_name'])
394 match['loci'] = zip(match['sequence_name'], match['peptide_begin'])
395 del match['peptide_begin']
396 del match['sequence_name']
398 if match['conjunct_index'] == 0:
399 del match['conjunct_index']
400 if match['mass_regime_index'] == 0:
401 del match['mass_regime_index']
402 if len(match['mass_trace']) == 0:
403 del match['mass_trace']
404 if match['pca_delta'] == 0:
405 del match['pca_delta']
406 return match
408 for sp_metadata, sp_matches in zip(py_s_spectra, py_matches):
409 sp_key = sp_metadata['id']
410 assert sp_key not in r, "duplicate spectrum id '%s'" % sp_key
411 sp_matches_stripped = [ strip_match(m) for m in sp_matches ]
412 sp_matches_stripped = [ m for m in sp_matches_stripped if m != None ]
413 r[sp_key] = (sp_metadata, sp_matches_stripped)
415 return r
418 def set_parameters(arg, options):
419 global GLP
420 GLP = arg
421 fixed_mod_map = dict((r[3], r) for r in GLP["pervasive_mods"])
422 regime_manifest = initialize_spectrum_parameters(options, GLP,
423 GLP["mass_regimes"],
424 fixed_mod_map)
425 mod_conjunct_triples = get_mod_conjunct_triples(GLP["potential_mods"],
426 GLP["potential_mod_limit"],
427 GLP["mass_regimes"])
428 if len(mod_conjunct_triples) >= sys.maxint-8:
429 chase_error("too many conjunct triples")
430 GLP[">mod_conjunct_triples"] = mod_conjunct_triples
432 info("%s unique potential modification conjuncts",
433 len(mod_conjunct_triples))
434 debug("mod_conjunct_triples (unique):\n%s",
435 pformat(mod_conjunct_triples))
437 # (cleavage_re, position of cleavage in cleavage_re)
438 cleavage_pattern, cleavage_position \
439 = cleavage_motif_re(GLP["cleavage_motif"])
440 if cleavage_position == None:
441 chase_error("cleavage site '%s' is missing '|'",
442 GLP["protein, cleavage site"])
443 cleavage_pattern = re.compile(cleavage_pattern)
444 GLP[">cleavage_pattern"] = cleavage_pattern
445 GLP[">cleavage_position"] = cleavage_position
448 def set_sequences(arg):
449 assert arg[0] in ('name', 'value')
450 if arg[0] == 'name':
451 checked = [ (db, file_sha1(db)) for db, cksum in arg[1] ]
452 if checked != arg[1]:
453 chase_error("database checksum does not match [%s]",
454 (checked, arg[1]))
455 # [(locusname, defline, seq, filename), ...]
456 fasta_db = list(read_fasta_files([db for db, cksum in arg[1]]))
457 else:
458 fasta_db = arg[1]
459 # [(idno, offset, locusname, defline, seq, seq_filename), ...]
460 db = []
461 for idno, (locusname, defline, sequence, filename) in enumerate(fasta_db):
462 db.extend(split_sequence_into_aa_runs(idno, locusname, defline,
463 sequence, filename))
464 db_residue_count = sum(len(dbi[3]) for dbi in db)
466 info("read %s sequences (%s runs, %s residues)", len(fasta_db), len(db),
467 db_residue_count)
468 max_run_length = max(len(r[3]) for r in db)
469 info("max run length is %s residues", max_run_length)
470 if max_run_length > sys.maxint:
471 chase_error("runs longer than %s not yet supported", max_run_length)
472 if not db:
473 chase_error("no database sequences")
475 context = cgreylag.search_context()
476 context.nonspecific_cleavage = (GLP["cleavage_motif"] == "[X]|[X]")
477 for idno, offset, locusname, defline, seq, seq_filename in db:
478 cp = []
479 if not context.nonspecific_cleavage:
480 cp = list(generate_cleavage_points(GLP[">cleavage_pattern"],
481 GLP[">cleavage_position"], seq))
482 sr = cgreylag.sequence_run(idno, offset, seq, cp, locusname)
483 context.sequence_runs.append(sr)
484 context.maximum_missed_cleavage_sites = GLP["maximum_missed_cleavage_sites"]
485 return context, fasta_db
488 def make_swig_spectrum(py_spectrum):
489 """Given a spectrum tuple (file_index, physical_index, index, name, mass,
490 charge, peaks), return an equivalent SWIG spectrum object.
492 sp = cgreylag.spectrum(float(py_spectrum[4]), int(py_spectrum[5]))
493 sp.file_id, sp.physical_id, sp.id, sp.name = py_spectrum[:4]
495 peaks_as_string = py_spectrum[6]
496 assert peaks_as_string[-1] == '\n', "malformed spectrum file"
497 # ( (mz0, intensity0), (mz1, intensity1), ... )
498 peaks = tuple(tuple(float(v) for v in pkline.split(None, 1))
499 for pkline in peaks_as_string.split('\n')[:-1])
500 #debug("py_spectrum: %s" % (py_spectrum,))
501 #debug("peaks: %s" % (peaks,))
502 sp.set_peaks_from_matrix(peaks)
503 return sp
506 def set_spectra(arg):
507 # [ (file_index, physical_index, index, name, mass, charge, peaks), ... ]
508 py_spectra = arg
510 # FIX: how to handle input errors???
512 ### create SWIG spectrum objects, for C++ access
513 # FIX: are physical_index and index even needed?
514 debug("parsing %s spectra", len(py_spectra))
515 spectra = [ make_swig_spectrum(sp) for sp in py_spectra ]
516 debug("parsed")
517 spectra.sort(key=lambda x: x.mass)
519 # FIX: send stats back to master? move common code?
520 def peak_statistics(spectra):
521 counts = [ len(sp.peaks) for sp in spectra ]
522 counts.sort()
523 n = len(counts)
524 return (counts[0], counts[int(n*0.25)], counts[int(n*0.5)],
525 counts[int(n*0.75)], counts[-1], sum(counts) / float(n))
527 def print_spectrum_statistics(spectra):
528 info(" %s spectra (mass range %s - %s)", len(spectra),
529 spectra[0].mass, spectra[-1].mass)
530 info(" peak stats: %s..%s..%s..%s..%s (mean=%.2f)"
531 % peak_statistics(spectra))
533 if spectra:
534 info("read spectra:")
535 print_spectrum_statistics(spectra)
536 else:
537 warning("no input spectra")
539 # filter and normalize spectra
540 for sp in spectra:
541 sp.filter_peaks(GLP["TIC_cutoff_proportion"],
542 CP.parent_mass_tolerance_max)
543 sp.classify(GLP["intensity_class_count"], GLP["intensity_class_ratio"],
544 GLP["fragment_mass_tolerance"])
546 min_psm = GLP["min_parent_spectrum_mass"]
547 max_psm = GLP["max_parent_spectrum_mass"]
549 # search only spectra that are of good enough quality
550 # FIX: also filter by 1 + 2 + 4 rule?
551 good_spectra = [ sp for sp in spectra
552 if (len(sp.peaks) >= 10
553 and min_psm <= sp.mass <= max_psm) ]
554 noise_spectrum_ids = (set(sp.id for sp in spectra)
555 - set(sp.id for sp in good_spectra))
557 if good_spectra:
558 info("after filtering:")
559 print_spectrum_statistics(good_spectra)
560 else:
561 info("no spectra pass filters")
563 cgreylag.spectrum.set_searchable_spectra(good_spectra)
564 return noise_spectrum_ids
567 def perform_search(state):
569 results = { 'modification conjuncts' : GLP[">mod_conjunct_triples"],
570 'argv' : sys.argv,
571 'noise_spectrum_ids' : state['noise_spectrum_ids'],
572 'matches' : {}, 'total comparisons' : 0 }
574 spectra_count = len(cgreylag.cvar.spectrum_searchable_spectra)
576 if not spectra_count:
577 info("no spectra after filtering--search skipped")
578 return results
580 score_statistics = cgreylag.score_stats(spectra_count,
581 GLP["best_result_count"])
583 info("searching")
584 search_all(state['context'], GLP["potential_mod_limit"],
585 GLP[">mod_conjunct_triples"], score_statistics)
587 info("returning result")
588 results['matches'] = results_dump(state['fasta_db'], score_statistics,
589 cgreylag.cvar.spectrum_searchable_spectra)
590 results['total comparisons'] = score_statistics.candidate_spectrum_count
592 #debug("sending search results:\n%s" % pformat(results))
594 # if options.estimate_only:
595 # # this factor is just an empirical guess
596 # print ("%.2f generic CPU hours"
597 # % (score_statistics.candidate_spectrum_count / 439.0e6))
598 # return
600 return results
604 # inspired by Beazley's generator talk
605 def listen_for_connections(port, port_count):
606 """Listen on a port from the range [port, port+port_count) and yield a
607 series of connections."""
608 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
609 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
611 for p in range(port, port+port_count):
612 try:
613 s.bind(('', p)) # socket.INADDR_ANY?
614 s.listen(5)
615 info("listening on port %s", p)
616 break
617 except socket.error, e:
618 if e[0] == errno.EADDRINUSE:
619 continue
620 raise
621 else:
622 error("could not listen, all specified ports in use")
624 while True:
625 client_socket, client_addr = s.accept()
626 info("received connection from %s", client_addr)
628 # try to better notice reboots, net failures, etc
629 try:
630 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,
631 s.getsockopt(socket.SOL_SOCKET,
632 socket.SO_KEEPALIVE) | 1)
633 except socket.error:
634 pass
636 yield client_socket
639 # FIX: give cleaner diagnostics on protocol failure?
642 def reset_state(state):
643 state.clear()
644 cgreylag.spectrum.set_searchable_spectra([])
647 def handle_command(options, state, command, arg):
648 """Handle command/arg from client, returning (response, response_arg) or
649 None. State that persists across commands is stored in 'state', or on the
650 C++ side with cgreylag.spectrum.set_searchable_spectra().
652 if command == 'parameters':
653 reset_state(state)
654 set_parameters(arg, options)
655 return None
656 elif command == 'sequences':
657 context, fasta_db = set_sequences(arg)
658 state['context'] = context
659 state['fasta_db'] = fasta_db
660 return None
661 elif command == 'spectra':
662 state['noise_spectrum_ids'] = set_spectra(arg)
663 return None
664 elif command == 'search':
665 #currently ignore arg
666 results = perform_search(state)
667 return 'found', results
668 else:
669 assert False, "unknown command '%s'" % command
672 def main(args=sys.argv[1:]):
673 parser = optparse.OptionParser(usage=
674 "usage: %prog [options]",
675 description=__doc__, version=VERSION)
676 pa = parser.add_option
677 DEFAULT_PORT = 10078
678 pa("--port", dest="port", type="int", default=DEFAULT_PORT,
679 help="first listener port [default=%s]" % DEFAULT_PORT)
680 DEFAULT_PORT_COUNT = 4
681 pa("--port-count", dest="port_count", type="int",
682 default=DEFAULT_PORT_COUNT,
683 help="number of ports to try [default=%s]" % DEFAULT_PORT_COUNT)
684 #DEFAULT_TIMEOUT = 600
685 #pa("--timeout", dest="timeout", type="int",
686 # default=DEFAULT_TIMEOUT,
687 # help="inactivity timeout [default=%ss]" % DEFAULT_TIMEOUT)
688 pa("-q", "--quiet", action="store_true", dest="quiet", help="no warnings")
689 pa("-p", "--show-progress", action="store_true", dest="show_progress",
690 help="show running progress")
691 pa("-v", "--verbose", action="store_true", dest="verbose",
692 help="be verbose")
693 pa("-l", "--logfile", dest="logfile",
694 help="log to FILE instead of stderr", metavar="FILE")
695 pa("--copyright", action="store_true", dest="copyright",
696 help="print copyright and exit")
697 pa("--debug", action="store_true", dest="debug",
698 help="output debugging info")
699 pa("--profile", action="store_true", dest="profile",
700 help="dump Python profiling output to './greylag-chase.prof.<pid>'")
701 (options, args) = parser.parse_args(args=args)
703 if options.copyright:
704 print __copyright__
705 sys.exit(0)
707 if len(args) > 0:
708 parser.print_help()
709 sys.exit(1)
711 set_logging(options)
713 info("starting on %s", socket.gethostname())
715 state = {}
716 for client in listen_for_connections(options.port, options.port_count):
717 try:
718 client_f_to = client.makefile('w')
719 client_f_from = client.makefile('r')
721 print >> client_f_to, "greylag %s ready" % VERSION
722 client_f_to.flush()
724 # new client--clear all prior state
725 reset_state(state)
727 while True:
728 command_line = client_f_from.readline()
729 if not command_line:
730 info("client closed connection")
731 break
732 command, arg_length = command_line.split()
733 arg = pickle.loads(client_f_from.read(int(arg_length)))
734 try:
735 r = handle_command(options, state, command, arg)
736 except ChaseException, e:
737 r = ('error', e)
738 if not r:
739 continue
740 response, response_arg = r
741 if response == 'error':
742 print >> client_f_to, 'error', response_arg
743 break
744 assert response == 'found'
745 p_response_arg = pickle.dumps(response_arg)
746 print >> client_f_to, 'found', len(p_response_arg)
747 client_f_to.write(p_response_arg)
748 client_f_to.flush()
749 except socket.error, e:
750 info("closing connection on error [%s]", e)
751 except Exception, e:
752 try: print >> client_f_to, ('error [%s "%s"]'
753 % (sys.exc_info()[0], e))
754 except: pass
755 raise
756 finally:
757 try: client_f_to.close()
758 except: pass
759 try: client_f_from.close()
760 except: pass
761 try: client.close()
762 except: pass
764 info("exiting")
767 if __name__ == '__main__':
768 # try:
769 # import psyco
770 # psyco.full()
771 # ###psyco.bind(formula_mass)
772 # warning('psyco enabled')
773 # except ImportError:
774 # pass
776 try:
777 if '--profile' in sys.argv:
778 import cProfile
779 import pstats
780 report_fn = "greylag-chase.prof.%s" % os.getpid()
781 data_fn = report_fn + ".tmp"
782 prof = cProfile.run('main()', data_fn)
783 with open(report_fn, 'w') as report_f:
784 try:
785 stats = pstats.Stats(data_fn, stream=report_f)
786 stats.strip_dirs()
787 stats.sort_stats('cumulative')
788 stats.print_stats(50)
789 stats.sort_stats('time')
790 stats.print_stats(50)
791 print "# profile report written to '%s'" % report_fn
792 finally:
793 try:
794 os.remove(data_fn)
795 except:
796 pass
797 else:
798 main()
799 except SystemExit:
800 raise
801 except:
802 logging.exception("unhandled exception")
803 sys.exit(1)
804 finally:
805 logging.shutdown()