Fixed node id handling and discovery
[cerebrum.git] / pylibcerebrum / test.py
blob77778dfa47be7e1cc2e06f54f55c3ae1b4cd868e
1 from pylibcerebrum.ganglion import Ganglion
2 from pylibcerebrum.serial_mux import SerialMux
3 from pylibcerebrum.timeout_exception import TimeoutException
4 import unittest
5 import serial
6 import generator
8 class TestGanglion(generator.TestCommStuff):
10 def setUp(self):
11 super(TestGanglion, self).setUp()
13 def test_connect(self):
14 fs = FakeSerial()
15 #fs.inp += b'\x00K]\x00\x00\x80\x00\x00=\x88\x8a\xc6\x94S\x90\x86\xa6c}%:\xbbAj\x14L\xd9\x1a\xae\x93n\r\x10\x83E1\xba]j\xdeG\xb1\xba\xa6[:\xa2\xb9\x8eR~#\xb9\x84%\xa0#q\x87\x17[\xd6\xcdA)J{\xab*\xf7\x96%\xff\xfa\x12g\x00'
16 fs.inp += b'\x00\x3F{"version":0.17,"builddate":"2012-05-23 23:42:17","members":{}}'
18 g = Ganglion(0x2342, ser=fs)
19 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x00\x00\x00', 'The ganglion sent garbage trying to read the device config.')
20 self.assert_('version' in g.config, 'The ganglion has an invalid config without a version attribute')
21 self.assertEqual(g.config['version'], 0.17, 'The ganglion\'s config\'s version attribute is wrong')
23 def test_simple_callback_invocation(self):
24 fs = FakeSerial()
25 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "functions": {"callback": {"id": 1}}}}})
26 #put the device's response into the input of the ganglion
27 fs.inp += b'\x00\x00\x00\x00'
28 #access the attribute
29 g.foo.callback()
30 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x01\x00\x00', 'Somehow pylibcerebrum sent a wrong command to the device.')
32 def test_complex_callback_invocation(self):
33 fs = FakeSerial()
34 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "functions": {"callback": {"id": 1, "args": "3B", "returns": "3B"}}}}})
35 #put the device's response into the input of the ganglion
36 fs.inp += b'\x00\x03ABC'
37 #access the attribute
38 foo = g.foo.callback(0x44, 0x45, 0x46)
39 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x01\x00\x03DEF', 'Somehow pylibcerebrum sent a wrong command to the device.')
40 self.assertEqual(foo, [0x41, 0x42, 0x43], 'Somehow a device response was decoded wrong.')
42 def test_attribute_read(self):
43 fs = FakeSerial()
44 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "B", "id": 1, "size": 1}}}}})
45 #put the device's response into the input of the ganglion
46 fs.inp += b'\x00\x01\x41'
47 #access the attribute
48 foo = g.foo.prop
49 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x01\x00\x00', 'Somehow pylibcerebrum sent a wrong command to the device.')
50 self.assertEqual(foo, 0x41, 'Somehow a device response was decoded wrong.')
52 def test_attribute_read_long(self):
53 #This reads a rather large attribute (>64 bytes) in order to catch problems with multipart callbacks.
54 fs = FakeSerial()
55 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "65B", "id": 1, "size": 65}}}}})
56 #put the device's response into the input of the ganglion
57 fs.inp += b'\x00\x41AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
58 #access the attribute
59 foo = g.foo.prop
60 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x01\x00\x00', 'Somehow pylibcerebrum sent a wrong command to the device.')
61 self.assertEqual(foo, [0x41]*65, 'Somehow a device response was decoded wrong.')
63 def test_attribute_write(self):
64 fs = FakeSerial()
65 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "B", "id": 1, "size": 1}}}}})
66 #put the device's response into the input of the ganglion
67 fs.inp += b'\x00\x00'
68 #access the attribute
69 g.foo.prop = 0x41
70 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x02\x00\x01\x41', 'Somehow pylibcerebrum sent a wrong command to the device.')
72 def test_attribute_write_long(self):
73 #This writes a rather large attribute (>64 bytes) in order to catch problems with multipart callbacks.
74 fs = FakeSerial()
75 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "257B", "id": 1, "size": 0x101}}}}})
76 #put the device's response into the input of the ganglion
77 fs.inp += b'\x00\x00'
78 #access the attribute
79 g.foo.prop = (0x41,)*0x101
80 self.assertEqual(fs.out, b'\\#\x23\x42\x00\x02\x01\x01'+b'A'*0x101, 'Somehow pylibcerebrum sent a wrong command to the device.')
81 pass
83 def test_attribute_forbidden_write(self):
84 fs = FakeSerial()
85 g = Ganglion(0x2342, ser=fs, jsonconfig = {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "B", "id": 1, "size": 1, "access": "r"}}}}})
86 #access the attribute
87 with self.assertRaises(TypeError, msg="prop is a read-only property"):
88 g.foo.prop = 0x41
90 class TestMux(unittest.TestCase):
91 def test_probe(self):
92 fs = FakeSerial()
93 m = SerialMux(ser=fs)
94 fs.inp += b'\xFF'
95 self.assertTrue(m._send_probe(0x2342, 5))
96 self.assertEqual(fs.out, b'\\#\xFF\xFF\x23\x42\x00\x05')
97 self.assertFalse(m._send_probe(0x2342, 5))
99 def test_discovery(self):
100 fs = FakeSerial()
101 m = SerialMux(ser=fs)
102 fs.inp += b'\xFF'*16
103 self.assertEqual(m.discover(), [0])
104 probepacket0 = lambda x: b'\\#\xFF\xFF\x00\x00'+x.to_bytes(2, 'big')
105 probepacket1 = lambda x: b'\\#\xFF\xFF'+(1<<x).to_bytes(2, 'big')+x.to_bytes(2, 'big')
106 self.assertEqual(fs.out, b''.join([probepacket0(i) for i in range(16)] + [probepacket1(15-i) for i in range(16)]))
107 #FIXME test a more complicated example here (unfortunately, this needs a more complicated FakeSerial that I'm currently to lazy to code)
109 class FakeSerial:
111 def __init__(self):
112 self.out = b''
113 self.inp = b''
114 self.timeout=1
116 def read(self, n):
117 if len(self.inp) < n:
118 raise TimeoutException()
119 r = self.inp[0:n]
120 self.inp = self.inp[n:]
121 return r
123 def write(self, bs):
124 if not isinstance(bs, bytes):
125 raise ArgumentError('FakeSerial.write only accepts -bytes-')
126 self.out += bs
128 def __enter__(self):
129 return self
131 def __exit__(self, *args):
132 pass
134 def setDTR(self, state):
135 pass
137 def setXonXoff(self, state):
138 pass
140 def close(self):
141 pass