2 # -*- coding: utf-8 -*-
5 # Scapy-based TRX interface sniffer
7 # (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 APP_CR_HOLDERS
= [("2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
33 from app_common
import ApplicationBase
34 from data_dump
import DATADumpFile
35 from data_msg
import *
37 class Application(ApplicationBase
):
39 cnt_burst_dropped_num
= 0
49 self
.app_print_copyright(APP_CR_HOLDERS
)
50 self
.argv
= self
.parse_argv()
53 self
.app_init_logging(self
.argv
)
55 # Open requested capture file
56 if self
.argv
.output_file
is not None:
57 self
.ddf
= DATADumpFile(self
.argv
.output_file
)
60 # Compose a packet filter
61 pkt_filter
= "udp and (port %d or port %d)" \
62 % (self
.argv
.base_port
+ 2, self
.argv
.base_port
+ 102)
64 log
.info("Listening on interface '%s'..." % self
.argv
.sniff_if
)
67 scapy
.all
.sniff(iface
= self
.argv
.sniff_if
, store
= 0,
68 filter = pkt_filter
, prn
= self
.pkt_handler
)
70 # Scapy registers its own signal handler
73 def pkt_handler(self
, ether
):
74 # Prevent loopback packet duplication
75 if self
.argv
.sniff_if
== "lo":
76 self
.lo_trigger
= not self
.lo_trigger
77 if not self
.lo_trigger
:
80 # Extract a TRX payload
85 # Convert to bytearray
86 msg_raw
= bytearray(trx
.load
)
88 # Determine a burst direction (L1 <-> TRX)
89 l12trx
= udp
.sport
> udp
.dport
91 # Create an empty DATA message
92 msg
= DATAMSG_L12TRX() if l12trx
else DATAMSG_TRX2L1()
94 # Attempt to parse the payload as a DATA message
96 msg
.parse_msg(msg_raw
)
98 log
.warning("Failed to parse message, dropping...")
99 self
.cnt_burst_dropped_num
+= 1
102 # Poke burst pass filter
103 rc
= self
.burst_pass_filter(l12trx
, msg
.fn
, msg
.tn
)
105 self
.cnt_burst_dropped_num
+= 1
109 log
.debug("%s burst: %s" \
110 % ("L1 -> TRX" if l12trx
else "TRX -> L1", msg
.desc_hdr()))
112 # Poke message handler
116 rc
= self
.burst_count(msg
.fn
, msg
.tn
)
120 def burst_pass_filter(self
, l12trx
, fn
, tn
):
122 if self
.argv
.direction
is not None:
123 if self
.argv
.direction
== "TRX" and not l12trx
:
125 elif self
.argv
.direction
== "L1" and l12trx
:
129 if self
.argv
.pf_tn
is not None:
130 if tn
!= self
.argv
.pf_tn
:
133 # Frame number filter
134 if self
.argv
.pf_fn_lt
is not None:
135 if fn
> self
.argv
.pf_fn_lt
:
137 if self
.argv
.pf_fn_gt
is not None:
138 if fn
< self
.argv
.pf_fn_gt
:
144 def msg_handle(self
, msg
):
145 if self
.argv
.verbose
:
148 # Append a new message to the capture
149 if self
.argv
.output_file
is not None:
150 self
.ddf
.append_msg(msg
)
152 def burst_count(self
, fn
, tn
):
153 # Update frame counter
154 if self
.cnt_frame_last
is None:
155 self
.cnt_frame_last
= fn
156 self
.cnt_frame_num
+= 1
158 if fn
!= self
.cnt_frame_last
:
159 self
.cnt_frame_num
+= 1
161 # Update burst counter
162 self
.cnt_burst_num
+= 1
164 # Stop sniffing after N bursts
165 if self
.argv
.burst_count
is not None:
166 if self
.cnt_burst_num
== self
.argv
.burst_count
:
167 log
.info("Collected required amount of bursts")
170 # Stop sniffing after N frames
171 if self
.argv
.frame_count
is not None:
172 if self
.cnt_frame_num
== self
.argv
.frame_count
:
173 log
.info("Collected required amount of frames")
179 log
.info("Shutting down...")
182 log
.info("%u bursts handled, %u dropped" \
183 % (self
.cnt_burst_num
, self
.cnt_burst_dropped_num
))
188 def parse_argv(self
):
189 parser
= argparse
.ArgumentParser(prog
= "trx_sniff",
190 description
= "Scapy-based TRX interface sniffer")
192 parser
.add_argument("-v", "--verbose",
193 dest
= "verbose", action
= "store_true",
194 help = "Print burst bits to stdout")
196 # Register common logging options
197 self
.app_reg_logging_options(parser
)
199 trx_group
= parser
.add_argument_group("TRX interface")
200 trx_group
.add_argument("-i", "--sniff-interface",
201 dest
= "sniff_if", type = str, default
= "lo", metavar
= "IF",
202 help = "Set network interface (default '%(default)s')")
203 trx_group
.add_argument("-p", "--base-port",
204 dest
= "base_port", type = int, default
= 6700,
205 help = "Set base port number (default %(default)s)")
206 trx_group
.add_argument("-o", "--output-file", metavar
= "FILE",
207 dest
= "output_file", type = str,
208 help = "Write bursts to a capture file")
210 cnt_group
= parser
.add_argument_group("Count limitations (optional)")
211 cnt_group
.add_argument("--frame-count", metavar
= "N",
212 dest
= "frame_count", type = int,
213 help = "Stop after sniffing N frames")
214 cnt_group
.add_argument("--burst-count", metavar
= "N",
215 dest
= "burst_count", type = int,
216 help = "Stop after sniffing N bursts")
218 pf_group
= parser
.add_argument_group("Filtering (optional)")
219 pf_group
.add_argument("--direction",
220 dest
= "direction", type = str, choices
= ["TRX", "L1"],
221 help = "Burst direction")
222 pf_group
.add_argument("--timeslot", metavar
= "TN",
223 dest
= "pf_tn", type = int, choices
= range(0, 8),
224 help = "TDMA timeslot number (equal TN)")
225 pf_group
.add_argument("--frame-num-lt", metavar
= "FN",
226 dest
= "pf_fn_lt", type = int,
227 help = "TDMA frame number (lower than FN)")
228 pf_group
.add_argument("--frame-num-gt", metavar
= "FN",
229 dest
= "pf_fn_gt", type = int,
230 help = "TDMA frame number (greater than FN)")
232 return parser
.parse_args()
234 if __name__
== '__main__':