Handle remaining samples < 4 correctly(?).
[calfbox.git] / py / nocturn.py
blob8ac1915512da836751ca8b79dfa9ab0c39b8fd7a
1 # Novation Nocturn driver
2 # Based on DWTFYW code by De Wet van Niekert (dewert). However, I had to
3 # put reading of the input endpoint in a separate thread because the only
4 # reliable way to read it is by using large timeouts (1s or so). With shorter
5 # timeouts, some events are lost/replaced by crossfader value.
7 import array
8 import binascii
9 import fcntl
10 import os
11 import usb.core
12 import usb.util
13 import sys
14 import time
15 import threading
17 class NocturnCommands:
18 def __init__(self):
19 self.pkt = ""
20 def setModeButtonLight(self, button, state):
21 self.pkt += chr(0x70 + button) + ('\x01' if state else '\x00')
22 def setUserButtonLight(self, button, state):
23 self.pkt += chr(0x78 + button) + ('\x01' if state else '\x00')
24 def setEncoderMode(self, encoder, mode):
25 self.pkt += chr(0x48 + encoder) + chr(mode << 4)
26 def setEncoderValue(self, encoder, value):
27 self.pkt += chr(0x40 + encoder) + chr(value)
28 def setSpeedDialMode(self, mode):
29 self.pkt += chr(0x51) + chr(mode << 4)
30 def setSpeedDialValue(self, value):
31 self.pkt += chr(0x50) + chr(value)
32 def clear(self):
33 for i in range(8):
34 self.setModeButtonLight(i, False)
35 self.setUserButtonLight(i, False)
36 if i & 1:
37 self.setEncoderMode(i, 3)
38 else:
39 self.setEncoderMode(i, 4)
40 self.setEncoderValue(i, 64)
41 self.setSpeedDialMode(5)
42 self.setSpeedDialValue(64)
44 class NocturnHandler(threading.Thread):
45 def __init__(self, n):
46 threading.Thread.__init__(self)
47 self.nocturn = n
48 self.rpipefd, self.wpipefd = os.pipe()
49 self.rpipe = os.fdopen(self.rpipefd, "rb")
50 self.wpipe = os.fdopen(self.wpipefd, "wb")
51 flags = fcntl.fcntl(self.rpipe, fcntl.F_GETFL)
52 fcntl.fcntl(self.rpipe, fcntl.F_SETFL, flags | os.O_NONBLOCK)
53 self.setDaemon(True)
54 def run(self):
55 while True:
56 pkt = self.nocturn.read()
57 if pkt is not None:
58 self.wpipe.write(pkt)
59 self.wpipe.flush()
60 def poll(self, handler):
61 try:
62 data = array.array('B', self.rpipe.read())
63 i = 0
64 # For longer sequences, Nocturn skips the control change message
65 while i < len(data):
66 if data[i] == 176:
67 i += 1
68 continue
69 handler(data[i], data[i + 1])
70 i += 2
71 except IOError as e:
72 pass
73 def get_poll_fd(self):
74 return self.rpipefd
76 class Nocturn:
77 vendorID = 0x1235
78 productID = 0x000a
79 def __init__(self):
80 dev = usb.core.find(idVendor=self.vendorID, idProduct=self.productID)
81 # The values in here don't seem to matter THAT much
82 initPackets=["b00000","28002b4a2c002e35","2a022c722e30"]
83 #This is a minimum set that enables the device, but then it doesn't
84 #really work reliably, at least the touch sensing
85 #initPackets=["b00000", "2800"]
87 if dev is None:
88 raise ValueError('Device not found')
89 sys.exit()
91 self.dev = dev
92 cfg = dev[1]
93 intf = cfg[(0,0)]
95 self.ep = intf[1]
96 self.ep2 = intf[0]
97 dev.set_configuration(2)
98 for packet in initPackets:
99 self.ep.write(binascii.unhexlify(packet))
101 self.reset()
102 self.reader = NocturnHandler(self)
103 self.reader.start()
105 def reset(self):
106 cmd = NocturnCommands()
107 cmd.clear()
108 self.execute(cmd)
110 def execute(self, cmd):
111 self.ep.write(cmd.pkt)
112 def read(self):
113 try:
114 data = self.ep2.read(8, None)
115 return buffer(data)
116 except usb.core.USBError as e:
117 return None
118 def poll(self, handler):
119 return self.reader.poll(handler)
120 def get_poll_fd(self):
121 return self.reader.get_poll_fd()