2 from external
.xmodem
import XMODEM
8 import BFinitPassthrough
12 from query_yes_no
import query_yes_no
13 from elrs_helpers
import ElrsUploadResult
15 BAUDRATE_DEFAULT
= 420000
17 def dbg_print(line
=''):
18 print(line
, flush
=True)
22 def uart_upload(port
, filename
, baudrate
, ghst
=False, ignore_incorrect_target
=False, key
=None, target
="", accept
=None) -> int:
26 dbg_print("=================== FIRMWARE UPLOAD ===================\n")
27 dbg_print(" Bin file '%s'\n" % filename
)
28 dbg_print(" Port %s @ %s\n" % (port
, baudrate
))
30 logging
.basicConfig(level
=logging
.ERROR
)
33 BootloaderInitSeq1
= bootloader
.get_init_seq('GHST', key
)
35 dbg_print(" Using GHST (half duplex)!\n")
37 BootloaderInitSeq1
= bootloader
.get_init_seq('CRSF', key
)
38 dbg_print(" Using CRSF (full duplex)!\n")
39 BootloaderInitSeq2
= bytes([0x62,0x62,0x62,0x62,0x62,0x62])
41 if not os
.path
.exists(filename
):
42 msg
= "[FAILED] file '%s' does not exist\n" % filename
44 return ElrsUploadResult
.ErrorGeneral
46 s
= serial
.Serial(port
=port
, baudrate
=baudrate
,
47 bytesize
=8, parity
='N', stopbits
=1,
48 timeout
=5, inter_byte_timeout
=None, xonxoff
=0, rtscts
=0)
50 rl
= SerialHelper
.SerialHelper(s
, 2., ["CCC"], half_duplex
)
52 # Check if bootloader *and* passthrough is already active
53 gotBootloader
= 'CCC' in rl
.read_line()
58 # Init Betaflight passthrough
60 BFinitPassthrough
.bf_passthrough_init(port
, baudrate
, half_duplex
)
61 except BFinitPassthrough
.PassthroughEnabled
as info
:
62 dbg_print(" Warning: '%s'\n" % info
)
63 except BFinitPassthrough
.PassthroughFailed
as failed
:
67 s
= serial
.Serial(port
=port
, baudrate
=baudrate
,
68 bytesize
=8, parity
='N', stopbits
=1,
69 timeout
=1, xonxoff
=0, rtscts
=0)
73 # Check again if we're in the bootloader now that passthrough is setup;
74 # Note: This is for button-method flashing
75 gotBootloader
= 'CCC' in rl
.read_line()
79 # legacy bootloader requires a 500ms delay
83 rl
.set_delimiters(["\n", "CCC"])
86 dbg_print("\nAttempting to reboot into bootloader...\n")
88 while gotBootloader
== False:
91 msg
= "[FAILED] to get to BL in reasonable time\n"
93 return ElrsUploadResult
.ErrorGeneral
96 # Enable debug logs after 5 retries
99 dbg_print("[%1u] retry...\n" % currAttempt
)
101 # clear RX buffer before continuing
104 rl
.write(BootloaderInitSeq1
)
107 while ((time
.time() - start
) < 2.):
108 line
= rl
.read_line().strip()
113 if SCRIPT_DEBUG
and line
:
114 dbg_print(" **DBG : '%s'\n" % line
)
116 if "BL_TYPE" in line
:
117 bl_type
= line
[8:].strip()
118 dbg_print(" Bootloader type found : '%s'\n" % bl_type
)
119 # Newer bootloaders do not require delay but keep
120 # a 100ms just in case
124 versionMatch
= re
.search('=== (v.*) ===', line
, re
.IGNORECASE
)
126 bl_version
= versionMatch
.group(1)
127 dbg_print(" Bootloader version found : '%s'\n" % bl_version
)
129 elif "hold down button" in line
:
130 time
.sleep(delay_seq2
)
131 rl
.write(BootloaderInitSeq2
)
136 # Button method has been used so we're not catching the header;
137 # let's jump right to the sanity check if we're getting enough C's
142 if line
!= target
and line
!= accept
and not ignore_incorrect_target
:
143 if query_yes_no("\n\n\nWrong target selected! your RX is '%s', trying to flash '%s', continue? Y/N\n" % (line
, target
)):
144 ignore_incorrect_target
= True
147 dbg_print("Wrong target selected your RX is '%s', trying to flash '%s'" % (line
, target
))
148 return ElrsUploadResult
.ErrorMismatch
150 dbg_print("Verified RX target '%s'" % target
)
152 dbg_print(" Got into bootloader after: %u attempts\n" % currAttempt
)
154 # sanity check! Make sure the bootloader is started
155 dbg_print("Wait sync...")
156 rl
.set_delimiters(["CCC"])
157 if "CCC" not in rl
.read_line(15.):
158 msg
= "[FAILED] Unable to communicate with bootloader...\n"
160 return ElrsUploadResult
.ErrorGeneral
161 dbg_print(" sync OK\n")
163 dbg_print("\nWe were already in bootloader\n")
165 dbg_print("\nWe were already in bootloader\n")
167 # change timeout to 5sec
172 stream
= open(filename
, 'rb')
173 filesize
= os
.stat(filename
).st_size
174 filechunks
= filesize
/ 128
176 dbg_print("\nuploading %d bytes...\n" % filesize
)
178 def StatusCallback(total_packets
, success_count
, error_count
):
180 if (total_packets
% 10 == 0):
181 dbg
= str(round((total_packets
/ filechunks
) * 100)) + "%"
182 if (error_count
> 0):
183 dbg
+= ", err: " + str(error_count
)
186 def getc(size
, timeout
=3):
187 return s
.read(size
) or None
189 def putc(data
, timeout
=3):
193 # Clean RX buffer in case of half duplex
194 # All written data is read into RX buffer
198 s
.reset_input_buffer()
200 modem
= XMODEM(getc
, putc
, mode
='xmodem')
201 #modem.log.setLevel(logging.DEBUG)
202 status
= modem
.send(stream
, retry
=10, callback
=StatusCallback
)
208 dbg_print("Success!!!!\n\n")
209 return ElrsUploadResult
.Success
211 dbg_print("[FAILED] Upload failed!\n\n")
212 return ElrsUploadResult
.ErrorGeneral
215 def on_upload(source
, target
, env
):
218 firmware_path
= str(source
[0])
219 upload_force
= target
[0].name
== 'uploadforce' # 'in' operator doesn't work on Alias list
221 upload_port
= env
.get('UPLOAD_PORT', None)
222 if upload_port
is None:
223 upload_port
= serials_find
.get_serial_port()
224 upload_speed
= env
.get('UPLOAD_SPEED', None)
225 if upload_speed
is None:
226 upload_speed
= BAUDRATE_DEFAULT
228 upload_flags
= env
.get('UPLOAD_FLAGS', [])
229 for line
in upload_flags
:
233 ghst
= eval(flag
.split("=")[1])
234 elif "BL_KEY=" in flag
:
235 envkey
= flag
.split("=")[1]
238 returncode
= uart_upload(upload_port
, firmware_path
, upload_speed
, ghst
, upload_force
, key
=envkey
, target
=re
.sub("_VIA_.*", "", env
['PIOENV'].upper()))
239 except Exception as e
:
240 dbg_print("{0}\n".format(e
))
241 return ElrsUploadResult
.ErrorGeneral
245 if __name__
== '__main__':
246 filename
= 'firmware.bin'
247 baudrate
= BAUDRATE_DEFAULT
249 filename
= sys
.argv
[1]
251 dbg_print("Filename not provided, going to use default firmware.bin")
253 if 2 < len(sys
.argv
):
256 port
= serials_find
.get_serial_port()
258 if 3 < len(sys
.argv
):
259 baudrate
= sys
.argv
[3]
261 returncode
= uart_upload(port
, filename
, baudrate
)