2 # -*- coding: utf-8 -*-
5 # Virtual Um-interface (fake transceiver)
7 # (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 APP_CR_HOLDERS
= [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")]
35 from app_common
import ApplicationBase
36 from burst_fwd
import BurstForwarder
37 from transceiver
import Transceiver
38 from data_msg
import Modulation
39 from clck_gen
import CLCKGen
40 from trx_list
import TRXList
41 from fake_pm
import FakePM
42 from gsm_shared
import *
44 class FakeTRX(Transceiver
):
45 """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
47 == ToA / RSSI measurement simulation
49 Since this is a virtual environment, we can simulate different
50 parameters of the physical RF interface:
52 - ToA (Timing of Arrival) - measured difference between expected
53 and actual time of burst arrival in units of 1/256 of GSM symbol
54 periods. A pair of both base and threshold values defines a range
55 of ToA value randomization:
57 from (toa256_base - toa256_rand_threshold)
58 to (toa256_base + toa256_rand_threshold).
60 - RSSI (Received Signal Strength Indication) - measured "power" of
61 the signal (per burst) in dBm. A pair of both base and threshold
62 values defines a range of RSSI value randomization:
64 from (rssi_base - rssi_rand_threshold)
65 to (rssi_base + rssi_rand_threshold).
67 - C/I (Carrier-to-Interference ratio) - value in cB (centiBels),
68 computed from the training sequence of each received burst, by
69 comparing the "ideal" training sequence with the actual one.
70 A pair of both base and threshold values defines a range of
73 from (ci_base - ci_rand_threshold)
74 to (ci_base + ci_rand_threshold).
76 Please note that the randomization is optional and disabled by default.
78 == Timing Advance handling
80 The BTS is using ToA measurements for UL bursts in order to calculate
81 Timing Advance value, that is then indicated to a MS, which in its turn
82 shall apply this value to the transmitted signal in order to compensate
83 the delay. Basically, every burst is transmitted in advance defined by
84 the indicated Timing Advance value. The valid range is 0..63, where
85 each unit means one GSM symbol advance. The actual Timing Advance value
86 is set using SETTA control command from MS. By default, it's set to 0.
88 == Path loss simulation
92 In some cases, e.g. due to a weak signal or high interference, a burst
93 can be lost, i.e. not detected by the receiver. This can also be
94 simulated using FAKE_DROP command on the control interface:
96 - burst_drop_amount - the amount of DL/UL bursts
97 to be dropped (i.e. not forwarded towards the MS/BTS),
99 - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
103 All simulation parameters mentioned above can be changed at runtime
104 using the commands with prefix 'FAKE_' on the control interface.
105 All of them are handled by our custom CTRL command handler.
109 TOA256_BASE_DEFAULT
= 0
110 RSSI_BASE_DEFAULT
= -60
113 def __init__(self
, *trx_args
, **trx_kwargs
):
114 Transceiver
.__init
__(self
, *trx_args
, **trx_kwargs
)
116 # Actual ToA, RSSI, C/I, TA values
117 self
.toa256_base
= self
.TOA256_BASE_DEFAULT
118 self
.rssi_base
= self
.RSSI_BASE_DEFAULT
119 self
.ci_base
= self
.CI_BASE_DEFAULT
122 # ToA, RSSI, C/I randomization thresholds
123 self
.toa256_rand_threshold
= 0
124 self
.rssi_rand_threshold
= 0
125 self
.ci_rand_threshold
= 0
127 # Path loss simulation (burst dropping)
128 self
.burst_drop_amount
= 0
129 self
.burst_drop_period
= 1
133 # Check if randomization is required
134 if self
.toa256_rand_threshold
== 0:
135 return self
.toa256_base
137 # Generate a random ToA value in required range
138 toa256_min
= self
.toa256_base
- self
.toa256_rand_threshold
139 toa256_max
= self
.toa256_base
+ self
.toa256_rand_threshold
140 return random
.randint(toa256_min
, toa256_max
)
144 # Check if randomization is required
145 if self
.rssi_rand_threshold
== 0:
146 return self
.rssi_base
148 # Generate a random RSSI value in required range
149 rssi_min
= self
.rssi_base
- self
.rssi_rand_threshold
150 rssi_max
= self
.rssi_base
+ self
.rssi_rand_threshold
151 return random
.randint(rssi_min
, rssi_max
)
155 # Check if randomization is required
156 if self
.ci_rand_threshold
== 0:
159 # Generate a random C/I value in required range
160 ci_min
= self
.ci_base
- self
.ci_rand_threshold
161 ci_max
= self
.ci_base
+ self
.ci_rand_threshold
162 return random
.randint(ci_min
, ci_max
)
164 # Path loss simulation: burst dropping
165 # Returns: True - drop, False - keep
166 def sim_burst_drop(self
, msg
):
167 # Check if dropping is required
168 if self
.burst_drop_amount
== 0:
171 if msg
.fn
% self
.burst_drop_period
== 0:
172 log
.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
173 % (self
, msg
.fn
, self
.burst_drop_period
))
174 self
.burst_drop_amount
-= 1
179 def _handle_data_msg_v1(self
, src_msg
, msg
):
180 # TODO: NOPE indications are not (yet) supported
183 # C/I (Carrier-to-Interference ratio)
186 # Pick modulation type by burst length
187 bl
= len(src_msg
.burst
)
188 msg
.mod_type
= Modulation
.pick_by_bl(bl
)
190 # Pick TSC (Training Sequence Code) and TSC set
191 if msg
.mod_type
is Modulation
.ModGMSK
:
192 ss
= TrainingSeqGMSK
.pick(src_msg
.burst
)
193 msg
.tsc
= ss
.tsc
if ss
is not None else 0
194 msg
.tsc_set
= ss
.tsc_set
if ss
is not None else 0
195 else: # TODO: other modulation types (at least 8-PSK)
199 # Takes (partially initialized) TRX2L1 message,
200 # simulates RF path parameters (such as RSSI),
201 # and sends towards the L1
202 def handle_data_msg(self
, src_trx
, src_msg
, msg
):
203 # Complete message header
204 msg
.toa256
= self
.toa256
207 # Version specific fields
209 self
._handle
_data
_msg
_v
1(src_msg
, msg
)
211 # Apply optional Timing Advance
213 msg
.toa256
-= src_trx
.ta
* 256
215 # Path loss simulation
216 if self
.sim_burst_drop(msg
):
219 # TODO: make legacy mode configurable (via argv?)
220 self
.data_if
.send_msg(msg
, legacy
= True)
222 # Simulation specific CTRL command handler
223 def ctrl_cmd_handler(self
, request
):
225 # Syntax: CMD SETTA <TA>
226 if self
.ctrl_if
.verify_cmd(request
, "SETTA", 1):
227 log
.debug("(%s) Recv SETTA cmd" % self
)
229 # Store indicated value
230 self
.ta
= int(request
[1])
233 # Timing of Arrival simulation
234 # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
235 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_TOA", 2):
236 log
.debug("(%s) Recv FAKE_TOA cmd" % self
)
238 # Parse and apply both base and threshold
239 self
.toa256_base
= int(request
[1])
240 self
.toa256_rand_threshold
= int(request
[2])
243 # Timing of Arrival simulation
244 # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
245 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_TOA", 1):
246 log
.debug("(%s) Recv FAKE_TOA cmd" % self
)
248 # Parse and apply delta
249 self
.toa256_base
+= int(request
[1])
253 # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
254 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_RSSI", 2):
255 log
.debug("(%s) Recv FAKE_RSSI cmd" % self
)
257 # Parse and apply both base and threshold
258 self
.rssi_base
= int(request
[1])
259 self
.rssi_rand_threshold
= int(request
[2])
263 # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
264 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_RSSI", 1):
265 log
.debug("(%s) Recv FAKE_RSSI cmd" % self
)
267 # Parse and apply delta
268 self
.rssi_base
+= int(request
[1])
272 # Absolute form: CMD FAKE_CI <BASE> <THRESH>
273 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_CI", 2):
274 log
.debug("(%s) Recv FAKE_CI cmd" % self
)
276 # Parse and apply both base and threshold
277 self
.ci_base
= int(request
[1])
278 self
.ci_rand_threshold
= int(request
[2])
282 # Relative form: CMD FAKE_CI <+-BASE_DELTA>
283 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_CI", 1):
284 log
.debug("(%s) Recv FAKE_CI cmd" % self
)
286 # Parse and apply delta
287 self
.ci_base
+= int(request
[1])
290 # Path loss simulation: burst dropping
291 # Syntax: CMD FAKE_DROP <AMOUNT>
292 # Dropping pattern: fn % 1 == 0
293 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_DROP", 1):
294 log
.debug("(%s) Recv FAKE_DROP cmd" % self
)
296 # Parse / validate amount of bursts
297 num
= int(request
[1])
299 log
.error("(%s) FAKE_DROP amount shall not "
300 "be negative" % self
)
303 self
.burst_drop_amount
= num
304 self
.burst_drop_period
= 1
307 # Path loss simulation: burst dropping
308 # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
309 # Dropping pattern: fn % period == 0
310 elif self
.ctrl_if
.verify_cmd(request
, "FAKE_DROP", 2):
311 log
.debug("(%s) Recv FAKE_DROP cmd" % self
)
313 # Parse / validate amount of bursts
314 num
= int(request
[1])
316 log
.error("(%s) FAKE_DROP amount shall not "
317 "be negative" % self
)
320 # Parse / validate period
321 period
= int(request
[2])
323 log
.error("(%s) FAKE_DROP period shall "
324 "be greater than zero" % self
)
327 self
.burst_drop_amount
= num
328 self
.burst_drop_period
= period
334 class Application(ApplicationBase
):
336 self
.app_print_copyright(APP_CR_HOLDERS
)
337 self
.argv
= self
.parse_argv()
339 # Set up signal handlers
340 signal
.signal(signal
.SIGINT
, self
.sig_handler
)
343 self
.app_init_logging(self
.argv
)
345 # List of all transceivers
346 self
.trx_list
= TRXList()
348 # Init shared clock generator
349 self
.clck_gen
= CLCKGen([])
351 # Power measurement emulation
352 # Noise: -120 .. -105
354 self
.fake_pm
= FakePM(-120, -105, -75, -50)
355 self
.fake_pm
.trx_list
= self
.trx_list
357 # Init TRX instance for BTS
358 self
.append_trx(self
.argv
.bts_addr
,
359 self
.argv
.bts_base_port
, name
= "BTS")
361 # Init TRX instance for BB
362 self
.append_trx(self
.argv
.bb_addr
,
363 self
.argv
.bb_base_port
, name
= "MS")
365 # Additional transceivers (optional)
366 if self
.argv
.trx_list
is not None:
367 for trx_def
in self
.argv
.trx_list
:
368 (name
, addr
, port
, idx
) = trx_def
369 self
.append_child_trx(addr
, port
, idx
, name
)
371 # Burst forwarding between transceivers
372 self
.burst_fwd
= BurstForwarder(self
.trx_list
)
374 log
.info("Init complete")
376 def append_trx(self
, remote_addr
, base_port
, name
= None):
377 trx
= FakeTRX(self
.argv
.trx_bind_addr
, remote_addr
, base_port
,
378 clck_gen
= self
.clck_gen
, pwr_meas
= self
.fake_pm
,
380 self
.trx_list
.add_trx(trx
)
382 def append_child_trx(self
, remote_addr
, base_port
, child_idx
, name
= None):
383 # Index 0 corresponds to the first transceiver
385 self
.append_trx(remote_addr
, base_port
, name
)
388 # Find 'parent' transceiver for a new child
389 trx_parent
= self
.trx_list
.find_trx(remote_addr
, base_port
)
390 if trx_parent
is None:
391 raise IndexError("Couldn't find parent transceiver "
392 "for '%s:%d/%d'" % (remote_addr
, base_port
, child_idx
))
394 # Allocate a new child
395 trx_child
= FakeTRX(self
.argv
.trx_bind_addr
, remote_addr
, base_port
,
396 child_idx
= child_idx
, pwr_meas
= self
.fake_pm
, name
= name
)
397 self
.trx_list
.add_trx(trx_child
)
399 # Link a new 'child' with its 'parent'
400 trx_parent
.child_trx_list
.add_trx(trx_child
)
403 # Compose list of to be monitored sockets
405 for trx
in self
.trx_list
:
406 sock_list
.append(trx
.ctrl_if
.sock
)
407 sock_list
.append(trx
.data_if
.sock
)
411 # Wait until we get any data on any socket
412 r_event
, _
, _
= select
.select(sock_list
, [], [])
414 # Iterate over all transceivers
415 for trx
in self
.trx_list
:
417 if trx
.data_if
.sock
in r_event
:
418 msg
= trx
.recv_data_msg()
420 self
.burst_fwd
.forward_msg(trx
, msg
)
423 if trx
.ctrl_if
.sock
in r_event
:
424 trx
.ctrl_if
.handle_rx()
427 log
.info("Shutting down...")
429 # Stop clock generator
432 # Parses a TRX definition of the following
433 # format: REMOTE_ADDR:BIND_PORT[/TRX_NUM]
434 # e.g. [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:5700/5
435 # e.g. 127.0.0.1:5700 or 127.0.0.1:5700/1
436 # e.g. foo@127.0.0.1:5700 or bar@127.0.0.1:5700/1
440 result
= re
.match("(.+@)?(.+):([0-9]+)(\/[0-9]+)?", val
)
441 (name
, addr
, port
, idx
) = result
.groups()
443 raise argparse
.ArgumentTypeError("Invalid TRX definition: %s" % val
)
450 # Cut '@' from TRX name
454 return (name
, addr
, int(port
), idx
)
456 def parse_argv(self
):
457 parser
= argparse
.ArgumentParser(prog
= "fake_trx",
458 description
= "Virtual Um-interface (fake transceiver)")
460 # Register common logging options
461 self
.app_reg_logging_options(parser
)
463 trx_group
= parser
.add_argument_group("TRX interface")
464 trx_group
.add_argument("-b", "--trx-bind-addr",
465 dest
= "trx_bind_addr", type = str, default
= "0.0.0.0",
466 help = "Set FakeTRX bind address (default %(default)s)")
467 trx_group
.add_argument("-R", "--bts-addr",
468 dest
= "bts_addr", type = str, default
= "127.0.0.1",
469 help = "Set BTS remote address (default %(default)s)")
470 trx_group
.add_argument("-r", "--bb-addr",
471 dest
= "bb_addr", type = str, default
= "127.0.0.1",
472 help = "Set BB remote address (default %(default)s)")
473 trx_group
.add_argument("-P", "--bts-base-port",
474 dest
= "bts_base_port", type = int, default
= 5700,
475 help = "Set BTS base port number (default %(default)s)")
476 trx_group
.add_argument("-p", "--bb-base-port",
477 dest
= "bb_base_port", type = int, default
= 6700,
478 help = "Set BB base port number (default %(default)s)")
480 mtrx_group
= parser
.add_argument_group("Additional transceivers")
481 mtrx_group
.add_argument("--trx",
482 metavar
= "REMOTE_ADDR:BASE_PORT[/TRX_NUM]",
483 dest
= "trx_list", type = self
.trx_def
, action
= "append",
484 help = "Add a transceiver for BTS or MS (e.g. 127.0.0.1:5703)")
486 argv
= parser
.parse_args()
488 # Make sure there is no overlap between ports
489 if argv
.bts_base_port
== argv
.bb_base_port
:
490 parser
.error("BTS and BB base ports shall be different")
494 def sig_handler(self
, signum
, frame
):
495 log
.info("Signal %d received" % signum
)
496 if signum
== signal
.SIGINT
:
500 if __name__
== '__main__':