2 """ Make things flicker. """
4 import time
, random
, sys
8 from collections
import defaultdict
9 from contextlib
import contextmanager
12 from pylibcerebrum
.serial_mux
import SerialMux
13 from flask
import Flask
, jsonify
, request
14 from flask_swagger
import ApiParameter
, SwaggerApiRegistry
15 from serial
.serialutil
import SerialException
18 # Interval to wait after a failed HTTP/JSONRPC request
19 HOSTNAME
= socket
.getfqdn(socket
.gethostname())
21 PORT
= '/dev/serial/by-id/usb-Arduino__www.arduino.cc__0043_64936333037351C0B032-if00'
24 # === Flask infrastructure ===
26 app
= Flask("cerebrum")
27 registry
= SwaggerApiRegistry(app
, baseurl
='http://'+HOSTNAME
+'/cerebrum')
29 # === Ganglion/Flask REST adapters ===
32 def exitOnSerialException():
35 except SerialException
:
36 print('Serial exception. Exiting.')
37 request
.environ
.get('werkzeug.server.shutdown')()
40 'c': ('integer', int),
41 'b': ('integer', int),
42 'B': ('integer', int),
43 '?': ('boolean', bool),
44 'h': ('integer', int),
45 'H': ('integer', int),
46 'i': ('integer', int),
47 'I': ('integer', int),
48 'l': ('integer', int),
49 'L': ('integer', int),
50 'q': ('integer', int),
51 'Q': ('integer', int),
52 'f': ('float', float),
53 'd': ('double', float),
58 def structGenerator(fmt
):
60 if fmt
[0] in list('@=<>!'):
62 for match
in re
.finditer('(\d*)([cbB?hHiIlLqQfdspP])', fmt
):
63 count
, t
= match
.groups()
65 swaggertype
, convfunc
= ARGTYPES
[t
]
66 yield '{}[{}]'.format(swaggertype
, count
), convfunc
70 class GenericGanglionAdapter
:
71 def __init__(self
, parent
, g
):
72 with
exitOnSerialException():
76 print('Registering', self
.name
, self
.base_url
)
77 self
.members
= [GANGLION_ADAPTERS
[m
.type](self
, m
) for _
, m
in g
.members
.items()]
79 @app.route(self
.base_url
, endpoint
=self
.base_url
)
81 """List this node's children, functions and properties."""
83 with
exitOnSerialException():
84 return jsonify({'members': {m
.name
: {'url': m
.base_url
} for m
in self
.members
},
85 'functions': list(g
.functions
.keys()),
86 'properties': {name
: list(zip(*list(structGenerator(fmt
))))[0] for name
, (_
,fmt
,_
) in g
.properties
.items()}})
88 for prop
, (_
,fmt
,_
) in g
.properties
.items():
89 _
, convfuncs
= zip(*list(structGenerator(fmt
)))
90 uri
= self
.base_url
+'/'+ prop
91 @app.route(uri
, endpoint
=uri
, methods
=['GET', 'POST'])
93 """Auto-generated wrapper function. Will complain when served garbage."""
95 with
exitOnSerialException():
96 if request
.method
== 'GET':
97 return jsonify(getattr(g
, prop
))
98 elif request
.method
== 'POST':
99 data
= request
.get_json(True)
100 assert isinstance(data
, list) and len(data
) == len(convfuncs
)
101 setattr(g
, prop
, [f(v
) for f
, v
in zip(convfuncs
, data
)])
102 return '{"result": "success"}\n'
106 return self
.parent
.base_url
+'/'+ self
.name
108 GANGLION_ADAPTERS
= defaultdict(lambda: GenericGanglionAdapter
)
109 def register_ganglion_adapter(cls
, ganglion_type
):
110 global GANGLION_ADAPTERS
111 GANGLION_ADAPTERS
[ganglion_type
] = cls
114 class GanglionRestAdapter
:
115 def __init__(self
, g
):
116 with
exitOnSerialException():
117 self
.name
= g
.config
.get('name') or g
.config
.get('node_id')
119 self
.members
= [GANGLION_ADAPTERS
[m
.type](self
, g
) for m
in g
]
125 # === Cerebrum serial port setup ===
126 with
exitOnSerialException():
127 s
= SerialMux(PORT
, BAUDRATE
)
129 print('Discovering cerebrum devices')
132 results
= s
.discover()
133 print('Opening devices')
134 adapters
= [GanglionRestAdapter(s
.open(i
)) for _
,i
in results
]
136 if __name__
== '__main__':