Merge branch 'master' of c-leuse:cerebrum
[cerebrum.git] / tools / telepathy.py
blobae52c930b604853a66db706bcec81389aa8dbd31
1 #!/usr/bin/env python3
2 """ Make things flicker. """
4 import time, random, sys
5 import re
6 import json
7 import argparse
8 from collections import defaultdict
9 from contextlib import contextmanager
10 import requests
11 import socket
12 from pylibcerebrum.serial_mux import SerialMux
13 from flask import Flask, jsonify, request
14 from flask_swagger import ApiParameter, SwaggerApiRegistry
15 from serial.serialutil import SerialException
17 # === Config ===
18 # Interval to wait after a failed HTTP/JSONRPC request
19 HOSTNAME = socket.getfqdn(socket.gethostname())
21 PORT = '/dev/serial/by-id/usb-Arduino__www.arduino.cc__0043_64936333037351C0B032-if00'
22 BAUDRATE = 115200
24 # === Flask infrastructure ===
26 app = Flask("cerebrum")
27 registry = SwaggerApiRegistry(app, baseurl='http://'+HOSTNAME+'/cerebrum')
29 # === Ganglion/Flask REST adapters ===
31 @contextmanager
32 def exitOnSerialException():
33 try:
34 yield
35 except SerialException:
36 print('Serial exception. Exiting.')
37 request.environ.get('werkzeug.server.shutdown')()
39 ARGTYPES = {
40 'c': ('integer', int),
41 'b': ('integer', int),
42 'B': ('integer', int),
43 '?': ('boolean', bool),
44 'h': ('integer', int),
45 'H': ('integer', int),
46 'i': ('integer', int),
47 'I': ('integer', int),
48 'l': ('integer', int),
49 'L': ('integer', int),
50 'q': ('integer', int),
51 'Q': ('integer', int),
52 'f': ('float', float),
53 'd': ('double', float),
54 's': ('string', str),
55 'p': ('string', str),
56 'P': ('string', str)
58 def structGenerator(fmt):
59 global ARGTYPES
60 if fmt[0] in list('@=<>!'):
61 fmt = fmt[1:]
62 for match in re.finditer('(\d*)([cbB?hHiIlLqQfdspP])', fmt):
63 count, t = match.groups()
64 if count:
65 swaggertype, convfunc = ARGTYPES[t]
66 yield '{}[{}]'.format(swaggertype, count), convfunc
67 else:
68 yield ARGTYPES[t]
70 class GenericGanglionAdapter:
71 def __init__(self, parent, g):
72 with exitOnSerialException():
73 self.parent = parent
74 self.g = g
75 self.name = g.name
76 print('Registering', self.name, self.base_url)
77 self.members = [GANGLION_ADAPTERS[m.type](self, m) for _, m in g.members.items()]
79 @app.route(self.base_url, endpoint=self.base_url)
80 def api_list():
81 """List this node's children, functions and properties."""
82 nonlocal self
83 with exitOnSerialException():
84 return jsonify({'members': {m.name: {'url': m.base_url} for m in self.members},
85 'functions': list(g.functions.keys()),
86 'properties': {name: list(zip(*list(structGenerator(fmt))))[0] for name, (_,fmt,_) in g.properties.items()}})
88 for prop, (_,fmt,_) in g.properties.items():
89 _, convfuncs = zip(*list(structGenerator(fmt)))
90 uri = self.base_url +'/'+ prop
91 @app.route(uri, endpoint=uri, methods=['GET', 'POST'])
92 def wrapper():
93 """Auto-generated wrapper function. Will complain when served garbage."""
94 nonlocal self, prop
95 with exitOnSerialException():
96 if request.method == 'GET':
97 return jsonify(getattr(g, prop))
98 elif request.method == 'POST':
99 data = request.get_json(True)
100 assert isinstance(data, list) and len(data) == len(convfuncs)
101 setattr(g, prop, [f(v) for f, v in zip(convfuncs, data)])
102 return '{"result": "success"}\n'
104 @property
105 def base_url(self):
106 return self.parent.base_url +'/'+ self.name
108 GANGLION_ADAPTERS = defaultdict(lambda: GenericGanglionAdapter)
109 def register_ganglion_adapter(cls, ganglion_type):
110 global GANGLION_ADAPTERS
111 GANGLION_ADAPTERS[ganglion_type] = cls
112 return cls
114 class GanglionRestAdapter:
115 def __init__(self, g):
116 with exitOnSerialException():
117 self.name = g.config.get('name') or g.config.get('node_id')
118 #Hardware discovery
119 self.members = [GANGLION_ADAPTERS[m.type](self, g) for m in g]
121 @property
122 def base_url(self):
123 return '/cerebrum'
125 # === Cerebrum serial port setup ===
126 with exitOnSerialException():
127 s = SerialMux(PORT, BAUDRATE)
128 time.sleep(1)
129 print('Discovering cerebrum devices')
130 results = []
131 while not results:
132 results = s.discover()
133 print('Opening devices')
134 adapters = [GanglionRestAdapter(s.open(i)) for _,i in results]
136 if __name__ == '__main__':
137 app.debug = True
138 app.run()