Check file type being uploaded. (#2756)
[ExpressLRS.git] / src / python / UARTupload.py
blob5d944cf5df1c999e969eaf3128b89cd3aceef3be
1 import serial
2 from external.xmodem import XMODEM
3 import time
4 import sys
5 import logging
6 import os
7 import serials_find
8 import BFinitPassthrough
9 import SerialHelper
10 import re
11 import bootloader
12 from query_yes_no import query_yes_no
13 from elrs_helpers import ElrsUploadResult
15 BAUDRATE_DEFAULT = 420000
17 def dbg_print(line=''):
18 print(line, flush=True)
19 return
22 def uart_upload(port, filename, baudrate, ghst=False, ignore_incorrect_target=False, key=None, target="", accept=None) -> int:
23 SCRIPT_DEBUG = False
24 half_duplex = False
26 dbg_print("=================== FIRMWARE UPLOAD ===================\n")
27 dbg_print(" Bin file '%s'\n" % filename)
28 dbg_print(" Port %s @ %s\n" % (port, baudrate))
30 logging.basicConfig(level=logging.ERROR)
32 if ghst:
33 BootloaderInitSeq1 = bootloader.get_init_seq('GHST', key)
34 half_duplex = True
35 dbg_print(" Using GHST (half duplex)!\n")
36 else:
37 BootloaderInitSeq1 = bootloader.get_init_seq('CRSF', key)
38 dbg_print(" Using CRSF (full duplex)!\n")
39 BootloaderInitSeq2 = bytes([0x62,0x62,0x62,0x62,0x62,0x62])
41 if not os.path.exists(filename):
42 msg = "[FAILED] file '%s' does not exist\n" % filename
43 dbg_print(msg)
44 return ElrsUploadResult.ErrorGeneral
46 s = serial.Serial(port=port, baudrate=baudrate,
47 bytesize=8, parity='N', stopbits=1,
48 timeout=5, inter_byte_timeout=None, xonxoff=0, rtscts=0)
50 rl = SerialHelper.SerialHelper(s, 2., ["CCC"], half_duplex)
52 # Check if bootloader *and* passthrough is already active
53 gotBootloader = 'CCC' in rl.read_line()
55 if not gotBootloader:
56 s.close()
58 # Init Betaflight passthrough
59 try:
60 BFinitPassthrough.bf_passthrough_init(port, baudrate, half_duplex)
61 except BFinitPassthrough.PassthroughEnabled as info:
62 dbg_print(" Warning: '%s'\n" % info)
63 except BFinitPassthrough.PassthroughFailed as failed:
64 SystemExit(failed)
66 # Prepare to upload
67 s = serial.Serial(port=port, baudrate=baudrate,
68 bytesize=8, parity='N', stopbits=1,
69 timeout=1, xonxoff=0, rtscts=0)
70 rl.set_serial(s)
71 rl.clear()
73 # Check again if we're in the bootloader now that passthrough is setup;
74 # Note: This is for button-method flashing
75 gotBootloader = 'CCC' in rl.read_line()
77 # Init bootloader
78 if not gotBootloader:
79 # legacy bootloader requires a 500ms delay
80 delay_seq2 = .5
82 rl.set_timeout(2.)
83 rl.set_delimiters(["\n", "CCC"])
85 currAttempt = 0
86 dbg_print("\nAttempting to reboot into bootloader...\n")
88 while gotBootloader == False:
89 currAttempt += 1
90 if 10 < currAttempt:
91 msg = "[FAILED] to get to BL in reasonable time\n"
92 dbg_print(msg)
93 return ElrsUploadResult.ErrorGeneral
95 if 5 < currAttempt:
96 # Enable debug logs after 5 retries
97 SCRIPT_DEBUG = True
99 dbg_print("[%1u] retry...\n" % currAttempt)
101 # clear RX buffer before continuing
102 rl.clear()
103 # request reboot
104 rl.write(BootloaderInitSeq1)
106 start = time.time()
107 while ((time.time() - start) < 2.):
108 line = rl.read_line().strip()
109 if not line:
110 # timeout
111 continue
113 if SCRIPT_DEBUG and line:
114 dbg_print(" **DBG : '%s'\n" % line)
116 if "BL_TYPE" in line:
117 bl_type = line[8:].strip()
118 dbg_print(" Bootloader type found : '%s'\n" % bl_type)
119 # Newer bootloaders do not require delay but keep
120 # a 100ms just in case
121 delay_seq2 = .1
122 continue
124 versionMatch = re.search('=== (v.*) ===', line, re.IGNORECASE)
125 if versionMatch:
126 bl_version = versionMatch.group(1)
127 dbg_print(" Bootloader version found : '%s'\n" % bl_version)
129 elif "hold down button" in line:
130 time.sleep(delay_seq2)
131 rl.write(BootloaderInitSeq2)
132 gotBootloader = True
133 break
135 elif "CCC" in line:
136 # Button method has been used so we're not catching the header;
137 # let's jump right to the sanity check if we're getting enough C's
138 gotBootloader = True
139 break
141 elif "_RX" in line:
142 if line != target and line != accept and not ignore_incorrect_target:
143 if query_yes_no("\n\n\nWrong target selected! your RX is '%s', trying to flash '%s', continue? Y/N\n" % (line, target)):
144 ignore_incorrect_target = True
145 continue
146 else:
147 dbg_print("Wrong target selected your RX is '%s', trying to flash '%s'" % (line, target))
148 return ElrsUploadResult.ErrorMismatch
149 elif target != "":
150 dbg_print("Verified RX target '%s'" % target)
152 dbg_print(" Got into bootloader after: %u attempts\n" % currAttempt)
154 # sanity check! Make sure the bootloader is started
155 dbg_print("Wait sync...")
156 rl.set_delimiters(["CCC"])
157 if "CCC" not in rl.read_line(15.):
158 msg = "[FAILED] Unable to communicate with bootloader...\n"
159 dbg_print(msg)
160 return ElrsUploadResult.ErrorGeneral
161 dbg_print(" sync OK\n")
162 else:
163 dbg_print("\nWe were already in bootloader\n")
164 else:
165 dbg_print("\nWe were already in bootloader\n")
167 # change timeout to 5sec
168 s.timeout = 5.
169 s.write_timeout = .3
171 # open binary
172 stream = open(filename, 'rb')
173 filesize = os.stat(filename).st_size
174 filechunks = filesize / 128
176 dbg_print("\nuploading %d bytes...\n" % filesize)
178 def StatusCallback(total_packets, success_count, error_count):
179 #sys.stdout.flush()
180 if (total_packets % 10 == 0):
181 dbg = str(round((total_packets / filechunks) * 100)) + "%"
182 if (error_count > 0):
183 dbg += ", err: " + str(error_count)
184 dbg_print(dbg)
186 def getc(size, timeout=3):
187 return s.read(size) or None
189 def putc(data, timeout=3):
190 cnt = s.write(data)
191 if half_duplex:
192 s.flush()
193 # Clean RX buffer in case of half duplex
194 # All written data is read into RX buffer
195 s.read(cnt)
196 return cnt
198 s.reset_input_buffer()
200 modem = XMODEM(getc, putc, mode='xmodem')
201 #modem.log.setLevel(logging.DEBUG)
202 status = modem.send(stream, retry=10, callback=StatusCallback)
204 s.close()
205 stream.close()
207 if (status):
208 dbg_print("Success!!!!\n\n")
209 return ElrsUploadResult.Success
211 dbg_print("[FAILED] Upload failed!\n\n")
212 return ElrsUploadResult.ErrorGeneral
215 def on_upload(source, target, env):
216 envkey = None
217 ghst = False
218 firmware_path = str(source[0])
219 upload_force = target[0].name == 'uploadforce' # 'in' operator doesn't work on Alias list
221 upload_port = env.get('UPLOAD_PORT', None)
222 if upload_port is None:
223 upload_port = serials_find.get_serial_port()
224 upload_speed = env.get('UPLOAD_SPEED', None)
225 if upload_speed is None:
226 upload_speed = BAUDRATE_DEFAULT
228 upload_flags = env.get('UPLOAD_FLAGS', [])
229 for line in upload_flags:
230 flags = line.split()
231 for flag in flags:
232 if "GHST=" in flag:
233 ghst = eval(flag.split("=")[1])
234 elif "BL_KEY=" in flag:
235 envkey = flag.split("=")[1]
237 try:
238 returncode = uart_upload(upload_port, firmware_path, upload_speed, ghst, upload_force, key=envkey, target=re.sub("_VIA_.*", "", env['PIOENV'].upper()))
239 except Exception as e:
240 dbg_print("{0}\n".format(e))
241 return ElrsUploadResult.ErrorGeneral
242 return returncode
245 if __name__ == '__main__':
246 filename = 'firmware.bin'
247 baudrate = BAUDRATE_DEFAULT
248 try:
249 filename = sys.argv[1]
250 except IndexError:
251 dbg_print("Filename not provided, going to use default firmware.bin")
253 if 2 < len(sys.argv):
254 port = sys.argv[2]
255 else:
256 port = serials_find.get_serial_port()
258 if 3 < len(sys.argv):
259 baudrate = sys.argv[3]
261 returncode = uart_upload(port, filename, baudrate)
262 exit(returncode)