2 """ Make things flicker. """
4 import time
, random
, sys
7 from collections
import defaultdict
9 from pylibcerebrum
.serial_mux
import SerialMux
11 parser
= argparse
.ArgumentParser(description
='Make things flicker.')
12 parser
.add_argument('port', type=str, help='The TTY the target device is connected to')
13 parser
.add_argument('-b', '--baudrate', type=str, default
=115200, help='The TTYs baud rate')
14 parser
.add_argument('-p', '--publish', type=str, action
='append', help='Publish connected switches and inputs to the given JSONRPC server(s)')
15 args
= parser
.parse_args()
17 # Interval to wait after a failed HTTP/JSONRPC request
20 s
= SerialMux(args
.port
, args
.baudrate
)
22 print('Discovering cerebrum devices')
25 results
= s
.discover()
26 print('Opening first device')
28 NODE_NAME
= g
.config
.get('name') or g
.config
.get('node_id')
29 print('Initializing device, node name', NODE_NAME
)
31 matrices
= [ m
for m
in g
if m
.type == 'matrix_input' ]
32 inputs
= [ m
for m
in g
if m
.type == 'simple-io' and m
.config
.get('mode') not in ['pwm', 'output'] ]
33 inputstates
= defaultdict(lambda: None)
34 led_matrices
= [ m
for m
in g
if m
.type == 'led_matrix' ]
35 pwms
= [ m
for m
in g
if m
.type == 'simple-io' and m
.config
.get('mode') == 'pwm' ]
36 print('Found', len(inputs
), 'inputs,', len(matrices
), 'input matrices,', len(led_matrices
), 'LED matrices and', len(pwms
), 'PWM outputs.')
38 lamp_rand_data
= [ [ [ random
.randint(0, 1) for _
in range(28) ] for _
in led_matrices
] for _
in range(128) ]
39 pwm_rand_data
= [ x
for _
in range(8) for x
in [[ random
.randint(0, 255) for _
in pwms
]]*16 ]
42 jsonrpc_retrytime
= defaultdict(lambda: 0)
43 def jsonrpc_notify(io
, state
):
44 global jsonrpc_retrytime
45 for p
in args
.publish
:
46 if time
.time() > jsonrpc_retrytime
[p
]:
48 requests
.post(p
, data
=json
.dumps({'method': 'cerebrumNotify', 'params': [NODE_NAME
, io
, state
], 'id': 0}))
49 except Exception as e
:
50 print('JSONRPC request failed, waiting', RETRY_INTERVAL
, 'seconds:', e
)
51 jsonrpc_retrytime
[p
] = time
.time() + RETRY_INTERVAL
55 for lamps
, pwmvs
in zip(lamp_rand_data
, pwm_rand_data
):
56 for m
, d
in zip(led_matrices
, lamps
):
58 for m
, d
in zip(pwms
, pwmvs
):
63 for i
, e
in enumerate(m
.config
['mapping']):
64 inputid
= m
.name
+'/'+str(i
)
68 decstate
= (decstate
<<1) | st
[bit
]
71 statemap
= list(range(maxval
))
72 statemap
[:len(e
)-1] = e
[1:]
74 state
= statemap
[decstate
]
75 if inputstates
[inputid
] != state
:
76 inputstates
[inputid
] = state
77 jsonrpc_notify(inputid
, state
)
80 if inputstates
[i
.name
] != st
:
81 inputstates
[i
.name
] = st
82 jsonrpc_notify(i
.name
, st
)