Merge branch 'master' of c-leuse:cerebrum
[cerebrum.git] / tools / randomatic.py
blob70bc50f1658d604e41780f0b6e997a940bd64f53
1 #!/usr/bin/env python3
2 """ Make things flicker. """
4 import time, random, sys
5 import json
6 import argparse
7 from collections import defaultdict
8 import requests
9 from pylibcerebrum.serial_mux import SerialMux
11 parser = argparse.ArgumentParser(description='Make things flicker.')
12 parser.add_argument('port', type=str, help='The TTY the target device is connected to')
13 parser.add_argument('-b', '--baudrate', type=str, default=115200, help='The TTYs baud rate')
14 parser.add_argument('-p', '--publish', type=str, action='append', help='Publish connected switches and inputs to the given JSONRPC server(s)')
15 args = parser.parse_args()
17 # Interval to wait after a failed HTTP/JSONRPC request
18 RETRY_INTERVAL = 10.0
20 s = SerialMux(args.port, args.baudrate)
21 time.sleep(1)
22 print('Discovering cerebrum devices')
23 results = []
24 while not results:
25 results = s.discover()
26 print('Opening first device')
27 g = s.open(0)
28 NODE_NAME = g.config.get('name') or g.config.get('node_id')
29 print('Initializing device, node name', NODE_NAME)
31 matrices = [ m for m in g if m.type == 'matrix_input' ]
32 inputs = [ m for m in g if m.type == 'simple-io' and m.config.get('mode') not in ['pwm', 'output'] ]
33 inputstates = defaultdict(lambda: None)
34 led_matrices = [ m for m in g if m.type == 'led_matrix' ]
35 pwms = [ m for m in g if m.type == 'simple-io' and m.config.get('mode') == 'pwm' ]
36 print('Found', len(inputs), 'inputs,', len(matrices), 'input matrices,', len(led_matrices), 'LED matrices and', len(pwms), 'PWM outputs.')
38 lamp_rand_data = [ [ [ random.randint(0, 1) for _ in range(28) ] for _ in led_matrices ] for _ in range(128) ]
39 pwm_rand_data = [ x for _ in range(8) for x in [[ random.randint(0, 255) for _ in pwms ]]*16 ]
42 jsonrpc_retrytime = defaultdict(lambda: 0)
43 def jsonrpc_notify(io, state):
44 global jsonrpc_retrytime
45 for p in args.publish:
46 if time.time() > jsonrpc_retrytime[p]:
47 try:
48 requests.post(p, data=json.dumps({'method': 'cerebrumNotify', 'params': [NODE_NAME, io, state], 'id': 0}))
49 except Exception as e:
50 print('JSONRPC request failed, waiting', RETRY_INTERVAL, 'seconds:', e)
51 jsonrpc_retrytime[p] = time.time() + RETRY_INTERVAL
54 while True:
55 for lamps, pwmvs in zip(lamp_rand_data, pwm_rand_data):
56 for m, d in zip(led_matrices, lamps):
57 m.buffer = d
58 for m, d in zip(pwms, pwmvs):
59 m.pwm = d
61 for m in matrices:
62 st = m.state
63 for i, e in enumerate(m.config['mapping']):
64 inputid = m.name+'/'+str(i)
66 decstate = 0
67 for bit in e[0]:
68 decstate = (decstate<<1) | st[bit]
70 maxval = 2**len(e[0])
71 statemap = list(range(maxval))
72 statemap[:len(e)-1] = e[1:]
74 state = statemap[decstate]
75 if inputstates[inputid] != state:
76 inputstates[inputid] = state
77 jsonrpc_notify(inputid, state)
78 for i in inputs:
79 st = i.state
80 if inputstates[i.name] != st:
81 inputstates[i.name] = st
82 jsonrpc_notify(i.name, st)
84 time.sleep(0.2)