trx_toolkit/fake_trx.py: refactor L12TRX -> TRX2L1 burst transformation
[osmocom-bb.git] / src / target / trx_toolkit / fake_trx.py
blobf0dc5a520ff71cea32ef4dbfcbbabbb70406810f
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # TRX Toolkit
5 # Virtual Um-interface (fake transceiver)
7 # (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 APP_CR_HOLDERS = [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")]
27 import logging as log
28 import signal
29 import argparse
30 import random
31 import select
32 import sys
33 import re
35 from app_common import ApplicationBase
36 from burst_fwd import BurstForwarder
37 from transceiver import Transceiver
38 from data_msg import Modulation
39 from clck_gen import CLCKGen
40 from trx_list import TRXList
41 from fake_pm import FakePM
42 from gsm_shared import *
44 class FakeTRX(Transceiver):
45 """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
47 == ToA / RSSI measurement simulation
49 Since this is a virtual environment, we can simulate different
50 parameters of the physical RF interface:
52 - ToA (Timing of Arrival) - measured difference between expected
53 and actual time of burst arrival in units of 1/256 of GSM symbol
54 periods. A pair of both base and threshold values defines a range
55 of ToA value randomization:
57 from (toa256_base - toa256_rand_threshold)
58 to (toa256_base + toa256_rand_threshold).
60 - RSSI (Received Signal Strength Indication) - measured "power" of
61 the signal (per burst) in dBm. A pair of both base and threshold
62 values defines a range of RSSI value randomization:
64 from (rssi_base - rssi_rand_threshold)
65 to (rssi_base + rssi_rand_threshold).
67 - C/I (Carrier-to-Interference ratio) - value in cB (centiBels),
68 computed from the training sequence of each received burst, by
69 comparing the "ideal" training sequence with the actual one.
70 A pair of both base and threshold values defines a range of
71 C/I randomization:
73 from (ci_base - ci_rand_threshold)
74 to (ci_base + ci_rand_threshold).
76 Please note that the randomization is optional and disabled by default.
78 == Timing Advance handling
80 The BTS is using ToA measurements for UL bursts in order to calculate
81 Timing Advance value, that is then indicated to a MS, which in its turn
82 shall apply this value to the transmitted signal in order to compensate
83 the delay. Basically, every burst is transmitted in advance defined by
84 the indicated Timing Advance value. The valid range is 0..63, where
85 each unit means one GSM symbol advance. The actual Timing Advance value
86 is set using SETTA control command from MS. By default, it's set to 0.
88 == Path loss simulation
90 === Burst dropping
92 In some cases, e.g. due to a weak signal or high interference, a burst
93 can be lost, i.e. not detected by the receiver. This can also be
94 simulated using FAKE_DROP command on the control interface:
96 - burst_drop_amount - the amount of DL/UL bursts
97 to be dropped (i.e. not forwarded towards the MS/BTS),
99 - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
101 == Configuration
103 All simulation parameters mentioned above can be changed at runtime
104 using the commands with prefix 'FAKE_' on the control interface.
105 All of them are handled by our custom CTRL command handler.
109 TOA256_BASE_DEFAULT = 0
110 RSSI_BASE_DEFAULT = -60
111 CI_BASE_DEFAULT = 90
113 def __init__(self, *trx_args, **trx_kwargs):
114 Transceiver.__init__(self, *trx_args, **trx_kwargs)
116 # Actual ToA, RSSI, C/I, TA values
117 self.toa256_base = self.TOA256_BASE_DEFAULT
118 self.rssi_base = self.RSSI_BASE_DEFAULT
119 self.ci_base = self.CI_BASE_DEFAULT
120 self.ta = 0
122 # ToA, RSSI, C/I randomization thresholds
123 self.toa256_rand_threshold = 0
124 self.rssi_rand_threshold = 0
125 self.ci_rand_threshold = 0
127 # Path loss simulation (burst dropping)
128 self.burst_drop_amount = 0
129 self.burst_drop_period = 1
131 @property
132 def toa256(self):
133 # Check if randomization is required
134 if self.toa256_rand_threshold == 0:
135 return self.toa256_base
137 # Generate a random ToA value in required range
138 toa256_min = self.toa256_base - self.toa256_rand_threshold
139 toa256_max = self.toa256_base + self.toa256_rand_threshold
140 return random.randint(toa256_min, toa256_max)
142 @property
143 def rssi(self):
144 # Check if randomization is required
145 if self.rssi_rand_threshold == 0:
146 return self.rssi_base
148 # Generate a random RSSI value in required range
149 rssi_min = self.rssi_base - self.rssi_rand_threshold
150 rssi_max = self.rssi_base + self.rssi_rand_threshold
151 return random.randint(rssi_min, rssi_max)
153 @property
154 def ci(self):
155 # Check if randomization is required
156 if self.ci_rand_threshold == 0:
157 return self.ci_base
159 # Generate a random C/I value in required range
160 ci_min = self.ci_base - self.ci_rand_threshold
161 ci_max = self.ci_base + self.ci_rand_threshold
162 return random.randint(ci_min, ci_max)
164 # Path loss simulation: burst dropping
165 # Returns: True - drop, False - keep
166 def sim_burst_drop(self, msg):
167 # Check if dropping is required
168 if self.burst_drop_amount == 0:
169 return False
171 if msg.fn % self.burst_drop_period == 0:
172 log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
173 % (self, msg.fn, self.burst_drop_period))
174 self.burst_drop_amount -= 1
175 return True
177 return False
179 def _handle_data_msg_v1(self, src_msg, msg):
180 # TODO: NOPE indications are not (yet) supported
181 msg.nope_ind = False
183 # C/I (Carrier-to-Interference ratio)
184 msg.ci = self.ci
186 # Pick modulation type by burst length
187 bl = len(src_msg.burst)
188 msg.mod_type = Modulation.pick_by_bl(bl)
190 # Pick TSC (Training Sequence Code) and TSC set
191 if msg.mod_type is Modulation.ModGMSK:
192 ss = TrainingSeqGMSK.pick(src_msg.burst)
193 msg.tsc = ss.tsc if ss is not None else 0
194 msg.tsc_set = ss.tsc_set if ss is not None else 0
195 else: # TODO: other modulation types (at least 8-PSK)
196 msg.tsc_set = 0
197 msg.tsc = 0
199 # Takes (partially initialized) TRX2L1 message,
200 # simulates RF path parameters (such as RSSI),
201 # and sends towards the L1
202 def handle_data_msg(self, src_trx, src_msg, msg):
203 # Complete message header
204 msg.toa256 = self.toa256
205 msg.rssi = self.rssi
207 # Version specific fields
208 if msg.ver >= 0x01:
209 self._handle_data_msg_v1(src_msg, msg)
211 # Apply optional Timing Advance
212 if src_trx.ta != 0:
213 msg.toa256 -= src_trx.ta * 256
215 # Path loss simulation
216 if self.sim_burst_drop(msg):
217 return
219 # TODO: make legacy mode configurable (via argv?)
220 self.data_if.send_msg(msg, legacy = True)
222 # Simulation specific CTRL command handler
223 def ctrl_cmd_handler(self, request):
224 # Timing Advance
225 # Syntax: CMD SETTA <TA>
226 if self.ctrl_if.verify_cmd(request, "SETTA", 1):
227 log.debug("(%s) Recv SETTA cmd" % self)
229 # Store indicated value
230 self.ta = int(request[1])
231 return 0
233 # Timing of Arrival simulation
234 # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
235 elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
236 log.debug("(%s) Recv FAKE_TOA cmd" % self)
238 # Parse and apply both base and threshold
239 self.toa256_base = int(request[1])
240 self.toa256_rand_threshold = int(request[2])
241 return 0
243 # Timing of Arrival simulation
244 # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
245 elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
246 log.debug("(%s) Recv FAKE_TOA cmd" % self)
248 # Parse and apply delta
249 self.toa256_base += int(request[1])
250 return 0
252 # RSSI simulation
253 # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
254 elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
255 log.debug("(%s) Recv FAKE_RSSI cmd" % self)
257 # Parse and apply both base and threshold
258 self.rssi_base = int(request[1])
259 self.rssi_rand_threshold = int(request[2])
260 return 0
262 # RSSI simulation
263 # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
264 elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
265 log.debug("(%s) Recv FAKE_RSSI cmd" % self)
267 # Parse and apply delta
268 self.rssi_base += int(request[1])
269 return 0
271 # C/I simulation
272 # Absolute form: CMD FAKE_CI <BASE> <THRESH>
273 elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 2):
274 log.debug("(%s) Recv FAKE_CI cmd" % self)
276 # Parse and apply both base and threshold
277 self.ci_base = int(request[1])
278 self.ci_rand_threshold = int(request[2])
279 return 0
281 # C/I simulation
282 # Relative form: CMD FAKE_CI <+-BASE_DELTA>
283 elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 1):
284 log.debug("(%s) Recv FAKE_CI cmd" % self)
286 # Parse and apply delta
287 self.ci_base += int(request[1])
288 return 0
290 # Path loss simulation: burst dropping
291 # Syntax: CMD FAKE_DROP <AMOUNT>
292 # Dropping pattern: fn % 1 == 0
293 elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
294 log.debug("(%s) Recv FAKE_DROP cmd" % self)
296 # Parse / validate amount of bursts
297 num = int(request[1])
298 if num < 0:
299 log.error("(%s) FAKE_DROP amount shall not "
300 "be negative" % self)
301 return -1
303 self.burst_drop_amount = num
304 self.burst_drop_period = 1
305 return 0
307 # Path loss simulation: burst dropping
308 # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
309 # Dropping pattern: fn % period == 0
310 elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
311 log.debug("(%s) Recv FAKE_DROP cmd" % self)
313 # Parse / validate amount of bursts
314 num = int(request[1])
315 if num < 0:
316 log.error("(%s) FAKE_DROP amount shall not "
317 "be negative" % self)
318 return -1
320 # Parse / validate period
321 period = int(request[2])
322 if period <= 0:
323 log.error("(%s) FAKE_DROP period shall "
324 "be greater than zero" % self)
325 return -1
327 self.burst_drop_amount = num
328 self.burst_drop_period = period
329 return 0
331 # Unhandled command
332 return None
334 class Application(ApplicationBase):
335 def __init__(self):
336 self.app_print_copyright(APP_CR_HOLDERS)
337 self.argv = self.parse_argv()
339 # Set up signal handlers
340 signal.signal(signal.SIGINT, self.sig_handler)
342 # Configure logging
343 self.app_init_logging(self.argv)
345 # List of all transceivers
346 self.trx_list = TRXList()
348 # Init shared clock generator
349 self.clck_gen = CLCKGen([])
351 # Power measurement emulation
352 # Noise: -120 .. -105
353 # BTS: -75 .. -50
354 self.fake_pm = FakePM(-120, -105, -75, -50)
355 self.fake_pm.trx_list = self.trx_list
357 # Init TRX instance for BTS
358 self.append_trx(self.argv.bts_addr,
359 self.argv.bts_base_port, name = "BTS")
361 # Init TRX instance for BB
362 self.append_trx(self.argv.bb_addr,
363 self.argv.bb_base_port, name = "MS")
365 # Additional transceivers (optional)
366 if self.argv.trx_list is not None:
367 for trx_def in self.argv.trx_list:
368 (name, addr, port, idx) = trx_def
369 self.append_child_trx(addr, port, idx, name)
371 # Burst forwarding between transceivers
372 self.burst_fwd = BurstForwarder(self.trx_list)
374 log.info("Init complete")
376 def append_trx(self, remote_addr, base_port, name = None):
377 trx = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
378 clck_gen = self.clck_gen, pwr_meas = self.fake_pm,
379 name = name)
380 self.trx_list.add_trx(trx)
382 def append_child_trx(self, remote_addr, base_port, child_idx, name = None):
383 # Index 0 corresponds to the first transceiver
384 if child_idx == 0:
385 self.append_trx(remote_addr, base_port, name)
386 return
388 # Find 'parent' transceiver for a new child
389 trx_parent = self.trx_list.find_trx(remote_addr, base_port)
390 if trx_parent is None:
391 raise IndexError("Couldn't find parent transceiver "
392 "for '%s:%d/%d'" % (remote_addr, base_port, child_idx))
394 # Allocate a new child
395 trx_child = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
396 child_idx = child_idx, pwr_meas = self.fake_pm, name = name)
397 self.trx_list.add_trx(trx_child)
399 # Link a new 'child' with its 'parent'
400 trx_parent.child_trx_list.add_trx(trx_child)
402 def run(self):
403 # Compose list of to be monitored sockets
404 sock_list = []
405 for trx in self.trx_list:
406 sock_list.append(trx.ctrl_if.sock)
407 sock_list.append(trx.data_if.sock)
409 # Enter main loop
410 while True:
411 # Wait until we get any data on any socket
412 r_event, _, _ = select.select(sock_list, [], [])
414 # Iterate over all transceivers
415 for trx in self.trx_list:
416 # DATA interface
417 if trx.data_if.sock in r_event:
418 msg = trx.recv_data_msg()
419 if msg is not None:
420 self.burst_fwd.forward_msg(trx, msg)
422 # CTRL interface
423 if trx.ctrl_if.sock in r_event:
424 trx.ctrl_if.handle_rx()
426 def shutdown(self):
427 log.info("Shutting down...")
429 # Stop clock generator
430 self.clck_gen.stop()
432 # Parses a TRX definition of the following
433 # format: REMOTE_ADDR:BIND_PORT[/TRX_NUM]
434 # e.g. [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:5700/5
435 # e.g. 127.0.0.1:5700 or 127.0.0.1:5700/1
436 # e.g. foo@127.0.0.1:5700 or bar@127.0.0.1:5700/1
437 @staticmethod
438 def trx_def(val):
439 try:
440 result = re.match("(.+@)?(.+):([0-9]+)(\/[0-9]+)?", val)
441 (name, addr, port, idx) = result.groups()
442 except:
443 raise argparse.ArgumentTypeError("Invalid TRX definition: %s" % val)
445 if idx is not None:
446 idx = int(idx[1:])
447 else:
448 idx = 0
450 # Cut '@' from TRX name
451 if name is not None:
452 name = name[:-1]
454 return (name, addr, int(port), idx)
456 def parse_argv(self):
457 parser = argparse.ArgumentParser(prog = "fake_trx",
458 description = "Virtual Um-interface (fake transceiver)")
460 # Register common logging options
461 self.app_reg_logging_options(parser)
463 trx_group = parser.add_argument_group("TRX interface")
464 trx_group.add_argument("-b", "--trx-bind-addr",
465 dest = "trx_bind_addr", type = str, default = "0.0.0.0",
466 help = "Set FakeTRX bind address (default %(default)s)")
467 trx_group.add_argument("-R", "--bts-addr",
468 dest = "bts_addr", type = str, default = "127.0.0.1",
469 help = "Set BTS remote address (default %(default)s)")
470 trx_group.add_argument("-r", "--bb-addr",
471 dest = "bb_addr", type = str, default = "127.0.0.1",
472 help = "Set BB remote address (default %(default)s)")
473 trx_group.add_argument("-P", "--bts-base-port",
474 dest = "bts_base_port", type = int, default = 5700,
475 help = "Set BTS base port number (default %(default)s)")
476 trx_group.add_argument("-p", "--bb-base-port",
477 dest = "bb_base_port", type = int, default = 6700,
478 help = "Set BB base port number (default %(default)s)")
480 mtrx_group = parser.add_argument_group("Additional transceivers")
481 mtrx_group.add_argument("--trx",
482 metavar = "REMOTE_ADDR:BASE_PORT[/TRX_NUM]",
483 dest = "trx_list", type = self.trx_def, action = "append",
484 help = "Add a transceiver for BTS or MS (e.g. 127.0.0.1:5703)")
486 argv = parser.parse_args()
488 # Make sure there is no overlap between ports
489 if argv.bts_base_port == argv.bb_base_port:
490 parser.error("BTS and BB base ports shall be different")
492 return argv
494 def sig_handler(self, signum, frame):
495 log.info("Signal %d received" % signum)
496 if signum == signal.SIGINT:
497 self.shutdown()
498 sys.exit(0)
500 if __name__ == '__main__':
501 app = Application()
502 app.run()