1 from pylibcerebrum
.ganglion
import Ganglion
2 from pylibcerebrum
.serial_mux
import SerialMux
3 from pylibcerebrum
.timeout_exception
import TimeoutException
8 class TestGanglion(generator
.TestCommStuff
):
11 super(TestGanglion
, self
).setUp()
13 def test_connect(self
):
15 #fs.inp += b'\x00K]\x00\x00\x80\x00\x00=\x88\x8a\xc6\x94S\x90\x86\xa6c}%:\xbbAj\x14L\xd9\x1a\xae\x93n\r\x10\x83E1\xba]j\xdeG\xb1\xba\xa6[:\xa2\xb9\x8eR~#\xb9\x84%\xa0#q\x87\x17[\xd6\xcdA)J{\xab*\xf7\x96%\xff\xfa\x12g\x00'
16 fs
.inp
+= b
'\x00\x3F{"version":0.17,"builddate":"2012-05-23 23:42:17","members":{}}'
18 g
= Ganglion(0x2342, ser
=fs
)
19 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x00\x00\x00', 'The ganglion sent garbage trying to read the device config.')
20 self
.assert_('version' in g
.config
, 'The ganglion has an invalid config without a version attribute')
21 self
.assertEqual(g
.config
['version'], 0.17, 'The ganglion\'s config\'s version attribute is wrong')
23 def test_simple_callback_invocation(self
):
25 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "functions": {"callback": {"id": 1}}}}})
26 #put the device's response into the input of the ganglion
27 fs
.inp
+= b
'\x00\x00\x00\x00'
30 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x01\x00\x00', 'Somehow pylibcerebrum sent a wrong command to the device.')
32 def test_complex_callback_invocation(self
):
34 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "functions": {"callback": {"id": 1, "args": "3B", "returns": "3B"}}}}})
35 #put the device's response into the input of the ganglion
36 fs
.inp
+= b
'\x00\x03ABC'
38 foo
= g
.foo
.callback(0x44, 0x45, 0x46)
39 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x01\x00\x03DEF', 'Somehow pylibcerebrum sent a wrong command to the device.')
40 self
.assertEqual(foo
, [0x41, 0x42, 0x43], 'Somehow a device response was decoded wrong.')
42 def test_attribute_read(self
):
44 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "B", "id": 1, "size": 1}}}}})
45 #put the device's response into the input of the ganglion
46 fs
.inp
+= b
'\x00\x01\x41'
49 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x01\x00\x00', 'Somehow pylibcerebrum sent a wrong command to the device.')
50 self
.assertEqual(foo
, 0x41, 'Somehow a device response was decoded wrong.')
52 def test_attribute_read_long(self
):
53 #This reads a rather large attribute (>64 bytes) in order to catch problems with multipart callbacks.
55 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "65B", "id": 1, "size": 65}}}}})
56 #put the device's response into the input of the ganglion
57 fs
.inp
+= b
'\x00\x41AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
60 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x01\x00\x00', 'Somehow pylibcerebrum sent a wrong command to the device.')
61 self
.assertEqual(foo
, [0x41]*65, 'Somehow a device response was decoded wrong.')
63 def test_attribute_write(self
):
65 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "B", "id": 1, "size": 1}}}}})
66 #put the device's response into the input of the ganglion
70 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x02\x00\x01\x41', 'Somehow pylibcerebrum sent a wrong command to the device.')
72 def test_attribute_write_long(self
):
73 #This writes a rather large attribute (>64 bytes) in order to catch problems with multipart callbacks.
75 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "257B", "id": 1, "size": 0x101}}}}})
76 #put the device's response into the input of the ganglion
79 g
.foo
.prop
= (0x41,)*0x101
80 self
.assertEqual(fs
.out
, b
'\\#\x23\x42\x00\x02\x01\x01'+b
'A'*0x101, 'Somehow pylibcerebrum sent a wrong command to the device.')
83 def test_attribute_forbidden_write(self
):
85 g
= Ganglion(0x2342, ser
=fs
, jsonconfig
= {'version': 0.17, 'builddate': '2012-05-23 23:42:17', 'members': {"foo": {"type": "test", "properties": {"prop": {"fmt": "B", "id": 1, "size": 1, "access": "r"}}}}})
87 with self
.assertRaises(TypeError, msg
="prop is a read-only property"):
90 class TestMux(unittest
.TestCase
):
95 self
.assertTrue(m
._send
_probe
(0x2342, 5))
96 self
.assertEqual(fs
.out
, b
'\\#\xFF\xFF\x23\x42\x00\x05')
97 self
.assertFalse(m
._send
_probe
(0x2342, 5))
99 def test_discovery(self
):
101 m
= SerialMux(ser
=fs
)
103 self
.assertEqual(m
.discover(), [0])
104 probepacket0
= lambda x
: b
'\\#\xFF\xFF\x00\x00'+x
.to_bytes(2, 'big')
105 probepacket1
= lambda x
: b
'\\#\xFF\xFF'+(1<<x
).to_bytes(2, 'big')+x
.to_bytes(2, 'big')
106 self
.assertEqual(fs
.out
, b
''.join([probepacket0(i
) for i
in range(16)] + [probepacket1(15-i
) for i
in range(16)]))
107 #FIXME test a more complicated example here (unfortunately, this needs a more complicated FakeSerial that I'm currently to lazy to code)
117 if len(self
.inp
) < n
:
118 raise TimeoutException()
120 self
.inp
= self
.inp
[n
:]
124 if not isinstance(bs
, bytes
):
125 raise ArgumentError('FakeSerial.write only accepts -bytes-')
131 def __exit__(self
, *args
):
134 def setDTR(self
, state
):
137 def setXonXoff(self
, state
):