trx_toolkit/trx_sniff.py: fix compatibility with Python 3
[osmocom-bb.git] / src / target / trx_toolkit / trx_sniff.py
blob6592455aed451c039181bf983da7586d2097217c
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # TRX Toolkit
5 # Scapy-based TRX interface sniffer
7 # (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 APP_CR_HOLDERS = [("2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
27 import logging as log
28 import argparse
29 import sys
31 import scapy.all
33 from app_common import ApplicationBase
34 from data_dump import DATADumpFile
35 from data_msg import *
37 class Application(ApplicationBase):
38 # Counters
39 cnt_burst_dropped_num = 0
40 cnt_burst_num = 0
42 cnt_frame_last = None
43 cnt_frame_num = 0
45 # Internal variables
46 lo_trigger = False
48 def __init__(self):
49 self.app_print_copyright(APP_CR_HOLDERS)
50 self.argv = self.parse_argv()
52 # Configure logging
53 self.app_init_logging(self.argv)
55 # Open requested capture file
56 if self.argv.output_file is not None:
57 self.ddf = DATADumpFile(self.argv.output_file)
59 def run(self):
60 # Compose a packet filter
61 pkt_filter = "udp and (port %d or port %d)" \
62 % (self.argv.base_port + 2, self.argv.base_port + 102)
64 log.info("Listening on interface '%s'..." % self.argv.sniff_if)
66 # Start sniffing...
67 scapy.all.sniff(iface = self.argv.sniff_if, store = 0,
68 filter = pkt_filter, prn = self.pkt_handler)
70 # Scapy registers its own signal handler
71 self.shutdown()
73 def pkt_handler(self, ether):
74 # Prevent loopback packet duplication
75 if self.argv.sniff_if == "lo":
76 self.lo_trigger = not self.lo_trigger
77 if not self.lo_trigger:
78 return
80 # Extract a TRX payload
81 ip = ether.payload
82 udp = ip.payload
83 trx = udp.payload
85 # Convert to bytearray
86 msg_raw = bytearray(trx.load)
88 # Determine a burst direction (L1 <-> TRX)
89 l12trx = udp.sport > udp.dport
91 # Create an empty DATA message
92 msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
94 # Attempt to parse the payload as a DATA message
95 try:
96 msg.parse_msg(msg_raw)
97 except:
98 log.warning("Failed to parse message, dropping...")
99 self.cnt_burst_dropped_num += 1
100 return
102 # Poke burst pass filter
103 rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn)
104 if rc is False:
105 self.cnt_burst_dropped_num += 1
106 return
108 # Debug print
109 log.debug("%s burst: %s" \
110 % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
112 # Poke message handler
113 self.msg_handle(msg)
115 # Poke burst counter
116 rc = self.burst_count(msg.fn, msg.tn)
117 if rc is True:
118 self.shutdown()
120 def burst_pass_filter(self, l12trx, fn, tn):
121 # Direction filter
122 if self.argv.direction is not None:
123 if self.argv.direction == "TRX" and not l12trx:
124 return False
125 elif self.argv.direction == "L1" and l12trx:
126 return False
128 # Timeslot filter
129 if self.argv.pf_tn is not None:
130 if tn != self.argv.pf_tn:
131 return False
133 # Frame number filter
134 if self.argv.pf_fn_lt is not None:
135 if fn > self.argv.pf_fn_lt:
136 return False
137 if self.argv.pf_fn_gt is not None:
138 if fn < self.argv.pf_fn_gt:
139 return False
141 # Burst passed ;)
142 return True
144 def msg_handle(self, msg):
145 if self.argv.verbose:
146 print(msg.burst)
148 # Append a new message to the capture
149 if self.argv.output_file is not None:
150 self.ddf.append_msg(msg)
152 def burst_count(self, fn, tn):
153 # Update frame counter
154 if self.cnt_frame_last is None:
155 self.cnt_frame_last = fn
156 self.cnt_frame_num += 1
157 else:
158 if fn != self.cnt_frame_last:
159 self.cnt_frame_num += 1
161 # Update burst counter
162 self.cnt_burst_num += 1
164 # Stop sniffing after N bursts
165 if self.argv.burst_count is not None:
166 if self.cnt_burst_num == self.argv.burst_count:
167 log.info("Collected required amount of bursts")
168 return True
170 # Stop sniffing after N frames
171 if self.argv.frame_count is not None:
172 if self.cnt_frame_num == self.argv.frame_count:
173 log.info("Collected required amount of frames")
174 return True
176 return False
178 def shutdown(self):
179 log.info("Shutting down...")
181 # Print statistics
182 log.info("%u bursts handled, %u dropped" \
183 % (self.cnt_burst_num, self.cnt_burst_dropped_num))
185 # Exit
186 sys.exit(0)
188 def parse_argv(self):
189 parser = argparse.ArgumentParser(prog = "trx_sniff",
190 description = "Scapy-based TRX interface sniffer")
192 parser.add_argument("-v", "--verbose",
193 dest = "verbose", action = "store_true",
194 help = "Print burst bits to stdout")
196 # Register common logging options
197 self.app_reg_logging_options(parser)
199 trx_group = parser.add_argument_group("TRX interface")
200 trx_group.add_argument("-i", "--sniff-interface",
201 dest = "sniff_if", type = str, default = "lo", metavar = "IF",
202 help = "Set network interface (default '%(default)s')")
203 trx_group.add_argument("-p", "--base-port",
204 dest = "base_port", type = int, default = 6700,
205 help = "Set base port number (default %(default)s)")
206 trx_group.add_argument("-o", "--output-file", metavar = "FILE",
207 dest = "output_file", type = str,
208 help = "Write bursts to a capture file")
210 cnt_group = parser.add_argument_group("Count limitations (optional)")
211 cnt_group.add_argument("--frame-count", metavar = "N",
212 dest = "frame_count", type = int,
213 help = "Stop after sniffing N frames")
214 cnt_group.add_argument("--burst-count", metavar = "N",
215 dest = "burst_count", type = int,
216 help = "Stop after sniffing N bursts")
218 pf_group = parser.add_argument_group("Filtering (optional)")
219 pf_group.add_argument("--direction",
220 dest = "direction", type = str, choices = ["TRX", "L1"],
221 help = "Burst direction")
222 pf_group.add_argument("--timeslot", metavar = "TN",
223 dest = "pf_tn", type = int, choices = range(0, 8),
224 help = "TDMA timeslot number (equal TN)")
225 pf_group.add_argument("--frame-num-lt", metavar = "FN",
226 dest = "pf_fn_lt", type = int,
227 help = "TDMA frame number (lower than FN)")
228 pf_group.add_argument("--frame-num-gt", metavar = "FN",
229 dest = "pf_fn_gt", type = int,
230 help = "TDMA frame number (greater than FN)")
232 return parser.parse_args()
234 if __name__ == '__main__':
235 app = Application()
236 app.run()