Added the Ampel and fixed an python lib escape handling bug
[cerebrum.git] / tools / trafotron.py
blob8af61062e4230c7ff3c9e1d245459566ff226133
1 #!/usr/bin/env python3
3 import time
4 from threading import Thread
5 import copy
6 import json
7 import requests
8 from http.server import HTTPServer, BaseHTTPRequestHandler
9 from pylibcerebrum.serial_mux import SerialMux
11 CBEAM = 'http://10.0.1.27:4254/rpc/'
12 HYPERBLAST = 'http://10.0.1.23:1337/'
13 CLOCC = 'http://c-lab-lock.cbrp3.c-base.org:1337/'
14 PORT = '/dev/serial/by-id/usb-Arduino__www.arduino.cc__0043_7523230313535161C072-if00'
15 BAUDRATE = 115200
16 AVG_SAMPLES = 128
17 SEND_THRESHOLD = 3
19 s = SerialMux(PORT, BAUDRATE)
20 print('discovering cerebrum devices')
21 results = []
22 while not results:
23 results = s.discover()
24 print(results)
25 print('opening first device')
26 g = s.open(0)
27 print('initializing device')
28 print(dir(g))
29 #bar status outputs
30 for io in [g.digital3, g.digital5, g.digital6, g.digital9, g.digital10, g.digital11]:
31 io.direction = 1
32 io.pwm_enabled = 1
33 print('starting event loop')
35 oldre, oldge, oldgn = None,None,None
36 def ampel(re,ge,gn):
37 global oldre, oldge, oldgn
38 if re and ge and gn:
39 gn = False
40 # Ensure no more than two lights are on at the same time, even for very short periods of time
41 if oldre != re:
42 g.ampelrot.state = re
43 if oldge != ge:
44 g.ampelgelb.state = ge
45 if oldgn != gn:
46 g.ampelgrün.state = gn
47 oldre,oldge,oldgn = re,ge,gn
49 #HACK ctrl-c ctrl-p -ed from barstatus.py
50 barstatus = 'closed'
51 ampelstate = ((0,0,0), (0,0,0))
52 lastchange = time.time() - 180
53 def animate():
54 global barstatus, lastchange, ampelstate
55 while True:
56 lookup = barstatus
57 if time.time() - lastchange < 180:
58 if barstatus == 'open':
59 lookup = 'opened'
60 elif barstatus == 'closed':
61 lookup = 'lastcall'
62 l1, r1 = {'open': ((10, 255, 10), (128, 128, 128)),
63 'opened': ((10, 128, 255), (128, 128, 128)),
64 'closed': ((128, 128, 128), (255, 4, 4)),
65 'lastcall': ((10, 255, 10), (128, 128, 128))}.get(lookup)
66 l2, r2 = {'open': ((10, 255, 10), (128, 128, 128)),
67 'opened': ((10, 255, 10), (128, 128, 128)),
68 'closed': ((128, 128, 128), (255, 4, 4)),
69 'lastcall': ((10, 255, 10), (255, 255, 10))}.get(lookup)
70 (g.digital3.pwm, g.digital5.pwm, g.digital6.pwm), (g.digital9.pwm, g.digital10.pwm, g.digital11.pwm) = l1, r1
71 ampel(*ampelstate[0])
72 time.sleep(0.33)
73 (g.digital3.pwm, g.digital5.pwm, g.digital6.pwm), (g.digital9.pwm, g.digital10.pwm, g.digital11.pwm) = l2, r2
74 ampel(*ampelstate[1])
75 time.sleep(0.66)
77 animator = Thread(target=animate)
78 animator.daemon = True
79 animator.start()
82 def sendstate(value):
83 print('SENDING', value)
84 requests.post(CBEAM, data=json.dumps({'method': 'trafotron', 'params': [value], 'id': 0}))
86 class AmpelHandler(BaseHTTPRequestHandler):
87 def do_POST(self):
88 global ampelstate
89 self.send_response(200)
90 self.end_headers()
91 postlen = int(self.headers['Content-Length'])
92 postdata = str(self.rfile.read(postlen), 'utf-8')
93 data = json.loads(postdata)
94 method = data.get('method')
95 if method == 'ampel':
96 p = data.get('params')
97 if type(p[0]) is list:
98 (r1,y1,g1),(r2,y2,g2) = p
99 r1,y1,g1 = bool(r1), bool(y1), bool(g1)
100 r2,y2,g2 = bool(r2), bool(y2), bool(g2)
101 ampelstate = ((r1,y1,g1),(r2,y2,g2))
102 elif type(p[0]) is int and len(p) == 1:
103 a,b = (bool(p[0]&32), bool(p[0]&16), bool(p[0]&8)), (bool(p[0]&4), bool(p[0]&2), bool(p[0]&1))
104 ampelstate = a,b
105 else:
106 r,y,g = p
107 r,y,g = bool(r), bool(y), bool(g)
108 ampelstate = (r,y,g), (r,y,g)
110 HOST, PORT = '', 1337
111 server = HTTPServer((HOST, PORT), AmpelHandler)
112 t = Thread(target=server.serve_forever)
113 t.daemon = True
114 t.start()
116 time.sleep(2)
118 # Enable pull-up on Arduino analog pin 4
119 g.analog4.state = 1
120 oldval = -2*SEND_THRESHOLD
121 oldbarstate = None
122 newbarstate = None
123 while True:
124 val = sum([ g.analog5.analog for i in range(AVG_SAMPLES)])/AVG_SAMPLES
125 if g.analog4.state:
126 newbarstate = 'closed'
127 else:
128 newbarstate = 'open'
129 if newbarstate != oldbarstate:
130 oldbarstate = newbarstate
132 #comm with animation thread
133 barstatus = newbarstate
134 lastchange = time.time()
136 requests.post(HYPERBLAST, data=json.dumps({'method': 'barstatus', 'params': [newbarstate], 'id': 0}))
137 requests.post(CBEAM, data=json.dumps({'method': 'barstatus', 'params': [newbarstate], 'id': 0}))
138 requests.post(CLOCC, data=json.dumps({'method': 'barstatus', 'params': [newbarstate], 'id': 0}))