3 # esp-idf serial output monitor tool. Does some helpful things:
4 # - Looks up hex addresses in ELF file with addr2line
5 # - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R)
6 # - Run "make (or idf.py) flash" (Ctrl-T Ctrl-F)
7 # - Run "make (or idf.py) app-flash" (Ctrl-T Ctrl-A)
8 # - If gdbstub output is detected, gdb is automatically loaded
10 # Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD
12 # Licensed under the Apache License, Version 2.0 (the "License");
13 # you may not use this file except in compliance with the License.
14 # You may obtain a copy of the License at
16 # http://www.apache.org/licenses/LICENSE-2.0
18 # Unless required by applicable law or agreed to in writing, software
19 # distributed under the License is distributed on an "AS IS" BASIS,
20 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 # See the License for the specific language governing permissions and
22 # limitations under the License.
24 # Contains elements taken from miniterm "Very simple serial terminal" which
25 # is part of pySerial. https://github.com/pyserial/pyserial
26 # (C)2002-2015 Chris Liechti <cliechti@gmx.net>
28 # Originally released under BSD-3-Clause license.
30 from __future__ import print_function, division
31 from __future__ import unicode_literals
32 from future import standard_library
33 standard_library.install_aliases()
34 from builtins import chr
35 from builtins import object
36 from builtins import bytes
50 import serial.tools.miniterm as miniterm
54 from distutils.version import StrictVersion
56 key_description = miniterm.key_description
58 # Control-key characters
67 CTRL_RBRACKET = '\x1d' # Ctrl+]
69 # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
70 ANSI_RED = '\033[1;31m'
71 ANSI_YELLOW = '\033[0;33m'
72 ANSI_NORMAL = '\033[0m'
74 def color_print(message, color):
75 """ Print a message to stderr with colored highlighting """
76 sys.stderr.write("%s%s%s\n" % (color, message, ANSI_NORMAL))
78 def yellow_print(message):
79 color_print(message, ANSI_YELLOW)
81 def red_print(message):
82 color_print(message, ANSI_RED)
86 # Tags for tuples in queues
91 # regex matches an potential PC value (0x4xxxxxxx)
92 MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
94 DEFAULT_TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
96 DEFAULT_PRINT_FILTER = ""
98 class StoppableThread(object):
100 Provide a Thread-like class which can be 'cancelled' via a subclass-provided
103 Can be started and stopped multiple times.
105 Isn't an instance of type Thread because Python Thread objects can only be run once
113 Is 'alive' whenever the internal thread object exists
115 return self._thread is not None
118 if self._thread is None:
119 self._thread = threading.Thread(target=self._run_outer)
123 pass # override to provide cancellation functionality
126 pass # override for the main thread behaviour
128 def _run_outer(self):
135 if self._thread is not None:
136 old_thread = self._thread
141 class ConsoleReader(StoppableThread):
142 """ Read input keys from the console and push them to the queue,
145 def __init__(self, console, event_queue, test_mode):
146 super(ConsoleReader, self).__init__()
147 self.console = console
148 self.event_queue = event_queue
149 self.test_mode = test_mode
157 # Windows kludge: because the console.cancel() method doesn't
158 # seem to work to unblock getkey() on the Windows implementation.
160 # So we only call getkey() if we know there's a key waiting for us.
162 while not msvcrt.kbhit() and self.alive:
167 # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
168 # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
169 # Therefore, we avoid calling it.
173 c = self.console.getkey()
174 except KeyboardInterrupt:
177 self.event_queue.put((TAG_KEY, c), False)
179 self.console.cleanup()
182 if os.name == 'posix' and not self.test_mode:
183 # this is the way cancel() is implemented in pyserial 3.3 or newer,
184 # older pyserial (3.1+) has cancellation implemented via 'select',
185 # which does not work when console sends an escape sequence response
187 # even older pyserial (<3.1) does not have this method
189 # on Windows there is a different (also hacky) fix, applied above.
191 # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
192 # TODO: introduce some workaround to make it work there.
194 # Note: This would throw exception in testing mode when the stdin is connected to PTY.
195 import fcntl, termios
196 fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
198 class SerialReader(StoppableThread):
199 """ Read serial data from the serial port and push to the
200 event queue, until stopped.
202 def __init__(self, serial, event_queue):
203 super(SerialReader, self).__init__()
204 self.baud = serial.baudrate
206 self.event_queue = event_queue
207 if not hasattr(self.serial, 'cancel_read'):
208 # enable timeout for checking alive flag,
209 # if cancel_read not available
210 self.serial.timeout = 0.25
213 if not self.serial.is_open:
214 self.serial.baudrate = self.baud
215 self.serial.rts = True # Force an RTS reset on open
217 self.serial.rts = False
220 data = self.serial.read(self.serial.in_waiting or 1)
222 self.event_queue.put((TAG_SERIAL, data), False)
227 if hasattr(self.serial, 'cancel_read'):
229 self.serial.cancel_read()
233 class LineMatcher(object):
235 Assembles a dictionary of filtering rules based on the --print_filter
236 argument of idf_monitor. Then later it is used to match lines and
237 determine whether they should be shown on screen or not.
246 level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
247 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
249 def __init__(self, print_filter):
251 self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
252 items = print_filter.split()
254 self._dict["*"] = self.LEVEL_V # default is to print everything
258 # specifying no warning level defaults to verbose level
262 raise ValueError('No tag specified in filter ' + f)
264 lev = self.level[s[1].upper()]
266 raise ValueError('Unknown warning level in filter ' + f)
268 raise ValueError('Missing ":" in filter ' + f)
269 self._dict[s[0]] = lev
270 def match(self, line):
272 m = self._re.search(line)
274 lev = self.level[m.group(1)]
275 if m.group(2) in self._dict:
276 return self._dict[m.group(2)] >= lev
277 return self._dict.get("*", self.LEVEL_N) >= lev
278 except (KeyError, IndexError):
279 # Regular line written with something else than ESP_LOG*
282 # We need something more than "*.N" for printing.
283 return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N
285 class SerialStopException(Exception):
287 This exception is used for stopping the IDF monitor in testing mode.
291 class Monitor(object):
293 Monitor application main class.
295 This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this
298 Main difference is that all event processing happens in the main thread, not the worker threads.
300 def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
301 super(Monitor, self).__init__()
302 self.event_queue = queue.Queue()
303 self.console = miniterm.Console()
305 sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
306 self.console.output = ANSIColorConverter(self.console.output)
307 self.console.byte_output = ANSIColorConverter(self.console.byte_output)
309 if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
310 # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
311 def getkey_patched(self):
312 c = self.enc_stdin.read(1)
314 c = chr(8) # map the BS key (which yields DEL) to backspace
317 self.console.getkey = types.MethodType(getkey_patched, self.console)
319 socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
320 self.serial = serial_instance
321 self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
322 self.serial_reader = SerialReader(self.serial, self.event_queue)
323 self.elf_file = elf_file
324 if not os.path.exists(make):
325 self.make = shlex.split(make) # allow for possibility the "make" arg is a list of arguments (for idf.py)
328 self.toolchain_prefix = toolchain_prefix
329 self.menu_key = CTRL_T
330 self.exit_key = CTRL_RBRACKET
332 self.translate_eol = {
333 "CRLF": lambda c: c.replace("\n", "\r\n"),
334 "CR": lambda c: c.replace("\n", "\r"),
335 "LF": lambda c: c.replace("\r", "\n"),
339 self._pressed_menu_key = False
340 self._last_line_part = b""
341 self._gdb_buffer = b""
342 self._pc_address_buffer = b""
343 self._line_matcher = LineMatcher(print_filter)
344 self._invoke_processing_last_line_timer = None
345 self._force_line_print = False
346 self._output_enabled = True
347 self._serial_check_exit = socket_mode
349 def invoke_processing_last_line(self):
350 self.event_queue.put((TAG_SERIAL_FLUSH, b''), False)
353 self.console_reader.start()
354 self.serial_reader.start()
356 while self.console_reader.alive and self.serial_reader.alive:
357 (event_tag, data) = self.event_queue.get()
358 if event_tag == TAG_KEY:
359 self.handle_key(data)
360 elif event_tag == TAG_SERIAL:
361 self.handle_serial_input(data)
362 if self._invoke_processing_last_line_timer is not None:
363 self._invoke_processing_last_line_timer.cancel()
364 self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line)
365 self._invoke_processing_last_line_timer.start()
366 # If no futher data is received in the next short period
367 # of time then the _invoke_processing_last_line_timer
368 # generates an event which will result in the finishing of
369 # the last line. This is fix for handling lines sent
371 elif event_tag == TAG_SERIAL_FLUSH:
372 self.handle_serial_input(data, finalize_line=True)
374 raise RuntimeError("Bad event data %r" % ((event_tag,data),))
375 except SerialStopException:
379 self.console_reader.stop()
380 self.serial_reader.stop()
381 # Cancelling _invoke_processing_last_line_timer is not
382 # important here because receiving empty data doesn't matter.
383 self._invoke_processing_last_line_timer = None
386 sys.stderr.write(ANSI_NORMAL + "\n")
388 def handle_key(self, key):
389 if self._pressed_menu_key:
390 self.handle_menu_key(key)
391 self._pressed_menu_key = False
392 elif key == self.menu_key:
393 self._pressed_menu_key = True
394 elif key == self.exit_key:
395 self.console_reader.stop()
396 self.serial_reader.stop()
399 key = self.translate_eol(key)
400 self.serial.write(codecs.encode(key))
401 except serial.SerialException:
402 pass # this shouldn't happen, but sometimes port has closed in serial thread
403 except UnicodeEncodeError:
404 pass # this can happen if a non-ascii character was passed, ignoring
406 def handle_serial_input(self, data, finalize_line=False):
407 sp = data.split(b'\n')
408 if self._last_line_part != b"":
409 # add unprocessed part from previous "data" to the first line
410 sp[0] = self._last_line_part + sp[0]
411 self._last_line_part = b""
413 # last part is not a full line
414 self._last_line_part = sp.pop()
417 if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
418 raise SerialStopException()
419 if self._output_enabled and (self._force_line_print or self._line_matcher.match(line.decode(errors="ignore"))):
420 self.console.write_bytes(line + b'\n')
421 self.handle_possible_pc_address_in_line(line)
422 self.check_gdbstub_trigger(line)
423 self._force_line_print = False
424 # Now we have the last part (incomplete line) in _last_line_part. By
425 # default we don't touch it and just wait for the arrival of the rest
426 # of the line. But after some time when we didn't received it we need
427 # to make a decision.
428 if self._last_line_part != b"":
429 if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))):
430 self._force_line_print = True;
431 if self._output_enabled:
432 self.console.write_bytes(self._last_line_part)
433 self.handle_possible_pc_address_in_line(self._last_line_part)
434 self.check_gdbstub_trigger(self._last_line_part)
435 # It is possible that the incomplete line cuts in half the PC
436 # address. A small buffer is kept and will be used the next time
437 # handle_possible_pc_address_in_line is invoked to avoid this problem.
438 # MATCH_PCADDR matches 10 character long addresses. Therefore, we
439 # keep the last 9 characters.
440 self._pc_address_buffer = self._last_line_part[-9:]
441 # GDB sequence can be cut in half also. GDB sequence is 7
442 # characters long, therefore, we save the last 6 characters.
443 self._gdb_buffer = self._last_line_part[-6:]
444 self._last_line_part = b""
445 # else: keeping _last_line_part and it will be processed the next time
446 # handle_serial_input is invoked
448 def handle_possible_pc_address_in_line(self, line):
449 line = self._pc_address_buffer + line
450 self._pc_address_buffer = b""
451 for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
452 self.lookup_pc_address(m.group())
454 def handle_menu_key(self, c):
455 if c == self.exit_key or c == self.menu_key: # send verbatim
456 self.serial.write(codecs.encode(c))
457 elif c in [ CTRL_H, 'h', 'H', '?' ]:
458 red_print(self.get_help_text())
459 elif c == CTRL_R: # Reset device via RTS
460 self.serial.setRTS(True)
462 self.serial.setRTS(False)
463 self.output_enable(True)
464 elif c == CTRL_F: # Recompile & upload
465 self.run_make("flash")
466 elif c == CTRL_A: # Recompile & upload app only
467 self.run_make("app-flash")
468 elif c == CTRL_Y: # Toggle output display
471 yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
472 # to fast trigger pause without press menu key
473 self.serial.setDTR(False) # IO0=HIGH
474 self.serial.setRTS(True) # EN=LOW, chip in reset
475 time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
476 self.serial.setDTR(True) # IO0=LOW
477 self.serial.setRTS(False) # EN=HIGH, chip out of reset
478 time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
479 self.serial.setDTR(False) # IO0=HIGH, done
481 red_print('--- unknown menu character {} --'.format(key_description(c)))
483 def get_help_text(self):
485 --- idf_monitor ({version}) - ESP-IDF monitor tool
486 --- based on miniterm from pySerial
488 --- {exit:8} Exit program
489 --- {menu:8} Menu escape key, followed by:
491 --- {menu:7} Send the menu character itself to remote
492 --- {exit:7} Send the exit character itself to remote
493 --- {reset:7} Reset target board via RTS line
494 --- {makecmd:7} Build & flash project
495 --- {appmake:7} Build & flash app only
496 --- {output:7} Toggle output display
497 --- {pause:7} Reset target into bootloader to pause app via RTS line
498 """.format(version=__version__,
499 exit=key_description(self.exit_key),
500 menu=key_description(self.menu_key),
501 reset=key_description(CTRL_R),
502 makecmd=key_description(CTRL_F),
503 appmake=key_description(CTRL_A),
504 output=key_description(CTRL_Y),
505 pause=key_description(CTRL_P) )
508 """ Use 'with self' to temporarily disable monitoring behaviour """
509 self.serial_reader.stop()
510 self.console_reader.stop()
512 def __exit__(self, *args, **kwargs):
513 """ Use 'with self' to temporarily disable monitoring behaviour """
514 self.console_reader.start()
515 self.serial_reader.start()
517 def prompt_next_action(self, reason):
518 self.console.setup() # set up console to trap input characters
522 --- Press {} to exit monitor.
523 --- Press {} to build & flash project.
524 --- Press {} to build & flash app.
525 --- Press any other key to resume monitor (resets target).""".format(reason,
526 key_description(self.exit_key),
527 key_description(CTRL_F),
528 key_description(CTRL_A) ))
529 k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
531 k = self.console.getkey()
533 self.console.cleanup()
534 if k == self.exit_key:
535 self.event_queue.put((TAG_KEY, k))
536 elif k in [ CTRL_F, CTRL_A ]:
537 self.event_queue.put((TAG_KEY, self.menu_key))
538 self.event_queue.put((TAG_KEY, k))
540 def run_make(self, target):
542 if isinstance(self.make, list):
543 popen_args = self.make + [ target ]
545 popen_args = [ self.make, target ]
546 yellow_print("Running %s..." % " ".join(popen_args))
547 p = subprocess.Popen(popen_args)
550 except KeyboardInterrupt:
552 if p.returncode != 0:
553 self.prompt_next_action("Build failed")
555 self.output_enable(True)
557 def lookup_pc_address(self, pc_addr):
558 translation = subprocess.check_output(
559 ["%saddr2line" % self.toolchain_prefix,
560 "-pfiaC", "-e", self.elf_file, pc_addr],
562 if not b"?? ??:0" in translation:
563 yellow_print(translation.decode())
565 def check_gdbstub_trigger(self, line):
566 line = self._gdb_buffer + line
567 self._gdb_buffer = b""
568 m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
571 chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
572 calc_chsum = int(m.group(2), 16)
574 return # payload wasn't valid hex digits
575 if chsum == calc_chsum:
578 red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum))
582 with self: # disable console control
583 sys.stderr.write(ANSI_NORMAL)
585 process = subprocess.Popen(["%sgdb" % self.toolchain_prefix,
586 "-ex", "set serial baud %d" % self.serial.baudrate,
587 "-ex", "target remote %s" % self.serial.port,
588 "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second
589 self.elf_file], cwd=".")
591 except KeyboardInterrupt:
592 pass # happens on Windows, maybe other OSes
595 # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
600 # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
601 subprocess.call(["stty", "sane"])
603 pass # don't care if there's no stty, we tried...
604 self.prompt_next_action("gdb exited")
606 def output_enable(self, enable):
607 self._output_enabled = enable
609 def output_toggle(self):
610 self._output_enabled = not self._output_enabled
611 yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled))
614 parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
618 help='Serial port device',
619 default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
624 help='Serial port baud rate',
626 default=os.environ.get('MONITOR_BAUD', 115200))
630 help='Command to run make',
631 type=str, default='make')
634 '--toolchain-prefix',
635 help="Triplet prefix to add before cross-toolchain names",
636 default=DEFAULT_TOOLCHAIN_PREFIX)
640 choices=['CR', 'LF', 'CRLF'],
641 type=lambda c: c.upper(),
642 help="End of line to use when sending to the serial port",
646 'elf_file', help='ELF file of application',
647 type=argparse.FileType('rb'))
651 help="Filtering string",
652 default=DEFAULT_PRINT_FILTER)
654 args = parser.parse_args()
656 if args.port.startswith("/dev/tty."):
657 args.port = args.port.replace("/dev/tty.", "/dev/cu.")
658 yellow_print("--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.")
659 yellow_print("--- Using %s instead..." % args.port)
661 serial_instance = serial.serial_for_url(args.port, args.baud,
663 serial_instance.dtr = False
664 serial_instance.rts = False
666 args.elf_file.close() # don't need this as a file
668 # remove the parallel jobserver arguments from MAKEFLAGS, as any
669 # parent make is only running 1 job (monitor), so we can re-spawn
670 # all of the child makes we need (the -j argument remains part of
673 makeflags = os.environ["MAKEFLAGS"]
674 makeflags = re.sub(r"--jobserver[^ =]*=[0-9,]+ ?", "", makeflags)
675 os.environ["MAKEFLAGS"] = makeflags
677 pass # not running a make jobserver
679 monitor = Monitor(serial_instance, args.elf_file.name, args.print_filter, args.make, args.toolchain_prefix, args.eol)
681 yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
683 yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
684 key_description(monitor.exit_key),
685 key_description(monitor.menu_key),
686 key_description(monitor.menu_key),
687 key_description(CTRL_H)))
688 if args.print_filter != DEFAULT_PRINT_FILTER:
689 yellow_print('--- Print filter: {} ---'.format(args.print_filter))
694 # Windows console stuff
696 STD_OUTPUT_HANDLE = -11
697 STD_ERROR_HANDLE = -12
700 FOREGROUND_INTENSITY = 8
703 # matches the ANSI color change sequences that IDF sends
704 RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
706 # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
707 ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ]
709 GetStdHandle = ctypes.windll.kernel32.GetStdHandle
710 SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
712 class ANSIColorConverter(object):
713 """Class to wrap a file-like output stream, intercept ANSI color codes,
714 and convert them into calls to Windows SetConsoleTextAttribute.
716 Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses.
718 Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text
719 color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the
720 least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
723 def __init__(self, output=None, decode_output=False):
725 self.decode_output = decode_output
726 self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
729 def _output_write(self, data):
731 if self.decode_output:
732 self.output.write(data.decode())
734 self.output.write(data)
736 # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
737 # an exception (however, the character is still written to the screen)
738 # Ref https://github.com/espressif/esp-idf/issues/1136
741 def write(self, data):
742 if isinstance(data, bytes):
743 data = bytearray(data)
745 data = bytearray(data, 'utf-8')
748 l = len(self.matched)
749 if b == b'\033': # ESC
751 elif (l == 1 and b == b'[') or (1 < l < 7):
753 if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console
754 # Flush is required only with Python3 - switching color before it is printed would mess up the console
756 SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
758 elif len(self.matched) == 7: # could be an ANSI sequence
759 m = re.match(RE_ANSI_COLOR, self.matched)
761 color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
762 if m.group(1) == b'1':
763 color |= FOREGROUND_INTENSITY
764 # Flush is required only with Python3 - switching color before it is printed would mess up the console
766 SetConsoleTextAttribute(self.handle, color)
768 self._output_write(self.matched) # not an ANSI color code, display verbatim
771 self._output_write(b)
777 if __name__ == "__main__":