3 # esp-idf serial output monitor tool. Does some helpful things:
4 # - Looks up hex addresses in ELF file with addr2line
5 # - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R)
6 # - Run "make flash" (Ctrl-T Ctrl-F)
7 # - Run "make app-flash" (Ctrl-T Ctrl-A)
8 # - If gdbstub output is detected, gdb is automatically loaded
10 # Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD
12 # Licensed under the Apache License, Version 2.0 (the "License");
13 # you may not use this file except in compliance with the License.
14 # You may obtain a copy of the License at
16 # http://www.apache.org/licenses/LICENSE-2.0
18 # Unless required by applicable law or agreed to in writing, software
19 # distributed under the License is distributed on an "AS IS" BASIS,
20 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 # See the License for the specific language governing permissions and
22 # limitations under the License.
24 # Contains elements taken from miniterm "Very simple serial terminal" which
25 # is part of pySerial. https://github.com/pyserial/pyserial
26 # (C)2002-2015 Chris Liechti <cliechti@gmx.net>
28 # Originally released under BSD-3-Clause license.
30 from __future__ import print_function, division
43 import serial.tools.miniterm as miniterm
47 from distutils.version import StrictVersion
49 key_description = miniterm.key_description
51 # Control-key characters
60 CTRL_RBRACKET = '\x1d' # Ctrl+]
62 # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
63 ANSI_RED = '\033[1;31m'
64 ANSI_YELLOW = '\033[0;33m'
65 ANSI_NORMAL = '\033[0m'
67 def color_print(message, color):
68 """ Print a message to stderr with colored highlighting """
69 sys.stderr.write("%s%s%s\n" % (color, message, ANSI_NORMAL))
71 def yellow_print(message):
72 color_print(message, ANSI_YELLOW)
74 def red_print(message):
75 color_print(message, ANSI_RED)
79 # Tags for tuples in queues
84 # regex matches an potential PC value (0x4xxxxxxx)
85 MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
87 DEFAULT_TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
89 DEFAULT_PRINT_FILTER = ""
91 class StoppableThread(object):
93 Provide a Thread-like class which can be 'cancelled' via a subclass-provided
96 Can be started and stopped multiple times.
98 Isn't an instance of type Thread because Python Thread objects can only be run once
106 Is 'alive' whenever the internal thread object exists
108 return self._thread is not None
111 if self._thread is None:
112 self._thread = threading.Thread(target=self._run_outer)
116 pass # override to provide cancellation functionality
119 pass # override for the main thread behaviour
121 def _run_outer(self):
128 if self._thread is not None:
129 old_thread = self._thread
134 class ConsoleReader(StoppableThread):
135 """ Read input keys from the console and push them to the queue,
138 def __init__(self, console, event_queue, test_mode):
139 super(ConsoleReader, self).__init__()
140 self.console = console
141 self.event_queue = event_queue
142 self.test_mode = test_mode
150 # Windows kludge: because the console.cancel() method doesn't
151 # seem to work to unblock getkey() on the Windows implementation.
153 # So we only call getkey() if we know there's a key waiting for us.
155 while not msvcrt.kbhit() and self.alive:
160 # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
161 # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
162 # Therefore, we avoid calling it.
166 c = self.console.getkey()
167 except KeyboardInterrupt:
170 self.event_queue.put((TAG_KEY, c), False)
172 self.console.cleanup()
175 if os.name == 'posix' and not self.test_mode:
176 # this is the way cancel() is implemented in pyserial 3.3 or newer,
177 # older pyserial (3.1+) has cancellation implemented via 'select',
178 # which does not work when console sends an escape sequence response
180 # even older pyserial (<3.1) does not have this method
182 # on Windows there is a different (also hacky) fix, applied above.
184 # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
185 # TODO: introduce some workaround to make it work there.
187 # Note: This would throw exception in testing mode when the stdin is connected to PTY.
188 import fcntl, termios
189 fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
191 class SerialReader(StoppableThread):
192 """ Read serial data from the serial port and push to the
193 event queue, until stopped.
195 def __init__(self, serial, event_queue):
196 super(SerialReader, self).__init__()
197 self.baud = serial.baudrate
199 self.event_queue = event_queue
200 if not hasattr(self.serial, 'cancel_read'):
201 # enable timeout for checking alive flag,
202 # if cancel_read not available
203 self.serial.timeout = 0.25
206 if not self.serial.is_open:
207 self.serial.baudrate = self.baud
208 self.serial.rts = True # Force an RTS reset on open
210 self.serial.rts = False
213 data = self.serial.read(self.serial.in_waiting or 1)
215 self.event_queue.put((TAG_SERIAL, data), False)
220 if hasattr(self.serial, 'cancel_read'):
222 self.serial.cancel_read()
228 Assembles a dictionary of filtering rules based on the --print_filter
229 argument of idf_monitor. Then later it is used to match lines and
230 determine whether they should be shown on screen or not.
239 level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
240 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
242 def __init__(self, print_filter):
244 self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
245 items = print_filter.split()
247 self._dict["*"] = self.LEVEL_V # default is to print everything
251 # specifying no warning level defaults to verbose level
255 raise ValueError('No tag specified in filter ' + f)
257 lev = self.level[s[1].upper()]
259 raise ValueError('Unknown warning level in filter ' + f)
261 raise ValueError('Missing ":" in filter ' + f)
262 self._dict[s[0]] = lev
263 def match(self, line):
265 m = self._re.search(line)
267 lev = self.level[m.group(1)]
268 if m.group(2) in self._dict:
269 return self._dict[m.group(2)] >= lev
270 return self._dict.get("*", self.LEVEL_N) >= lev
271 except (KeyError, IndexError):
272 # Regular line written with something else than ESP_LOG*
275 # We need something more than "*.N" for printing.
276 return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N
278 class SerialStopException(Exception):
280 This exception is used for stopping the IDF monitor in testing mode.
284 class Monitor(object):
286 Monitor application main class.
288 This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this
291 Main difference is that all event processing happens in the main thread, not the worker threads.
293 def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
294 super(Monitor, self).__init__()
295 self.event_queue = queue.Queue()
296 self.console = miniterm.Console()
298 sys.stderr = ANSIColorConverter(sys.stderr)
299 self.console.output = ANSIColorConverter(self.console.output)
300 self.console.byte_output = ANSIColorConverter(self.console.byte_output)
302 if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
303 # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
304 def getkey_patched(self):
305 c = self.enc_stdin.read(1)
306 if c == unichr(0x7f):
307 c = unichr(8) # map the BS key (which yields DEL) to backspace
310 self.console.getkey = types.MethodType(getkey_patched, self.console)
312 socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
313 self.serial = serial_instance
314 self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
315 self.serial_reader = SerialReader(self.serial, self.event_queue)
316 self.elf_file = elf_file
318 self.toolchain_prefix = toolchain_prefix
319 self.menu_key = CTRL_T
320 self.exit_key = CTRL_RBRACKET
322 self.translate_eol = {
323 "CRLF": lambda c: c.replace(b"\n", b"\r\n"),
324 "CR": lambda c: c.replace(b"\n", b"\r"),
325 "LF": lambda c: c.replace(b"\r", b"\n"),
329 self._pressed_menu_key = False
330 self._last_line_part = b""
331 self._gdb_buffer = b""
332 self._pc_address_buffer = b""
333 self._line_matcher = LineMatcher(print_filter)
334 self._invoke_processing_last_line_timer = None
335 self._force_line_print = False
336 self._output_enabled = True
337 self._serial_check_exit = socket_mode
339 def invoke_processing_last_line(self):
340 self.event_queue.put((TAG_SERIAL_FLUSH, b''), False)
343 self.console_reader.start()
344 self.serial_reader.start()
346 while self.console_reader.alive and self.serial_reader.alive:
347 (event_tag, data) = self.event_queue.get()
348 if event_tag == TAG_KEY:
349 self.handle_key(data)
350 elif event_tag == TAG_SERIAL:
351 self.handle_serial_input(data)
352 if self._invoke_processing_last_line_timer is not None:
353 self._invoke_processing_last_line_timer.cancel()
354 self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line)
355 self._invoke_processing_last_line_timer.start()
356 # If no futher data is received in the next short period
357 # of time then the _invoke_processing_last_line_timer
358 # generates an event which will result in the finishing of
359 # the last line. This is fix for handling lines sent
361 elif event_tag == TAG_SERIAL_FLUSH:
362 self.handle_serial_input(data, finalize_line=True)
364 raise RuntimeError("Bad event data %r" % ((event_tag,data),))
365 except SerialStopException:
369 self.console_reader.stop()
370 self.serial_reader.stop()
371 # Cancelling _invoke_processing_last_line_timer is not
372 # important here because receiving empty data doesn't matter.
373 self._invoke_processing_last_line_timer = None
376 sys.stderr.write(ANSI_NORMAL + "\n")
378 def handle_key(self, key):
379 if self._pressed_menu_key:
380 self.handle_menu_key(key)
381 self._pressed_menu_key = False
382 elif key == self.menu_key:
383 self._pressed_menu_key = True
384 elif key == self.exit_key:
385 self.console_reader.stop()
386 self.serial_reader.stop()
389 key = self.translate_eol(key)
390 self.serial.write(codecs.encode(key))
391 except serial.SerialException:
392 pass # this shouldn't happen, but sometimes port has closed in serial thread
393 except UnicodeEncodeError:
394 pass # this can happen if a non-ascii character was passed, ignoring
396 def handle_serial_input(self, data, finalize_line=False):
397 sp = data.split(b'\n')
398 if self._last_line_part != b"":
399 # add unprocessed part from previous "data" to the first line
400 sp[0] = self._last_line_part + sp[0]
401 self._last_line_part = b""
403 # last part is not a full line
404 self._last_line_part = sp.pop()
407 if self._serial_check_exit and line == self.exit_key:
408 raise SerialStopException()
409 if self._output_enabled and (self._force_line_print or self._line_matcher.match(line)):
410 self.console.write_bytes(line + b'\n')
411 self.handle_possible_pc_address_in_line(line)
412 self.check_gdbstub_trigger(line)
413 self._force_line_print = False
414 # Now we have the last part (incomplete line) in _last_line_part. By
415 # default we don't touch it and just wait for the arrival of the rest
416 # of the line. But after some time when we didn't received it we need
417 # to make a decision.
418 if self._last_line_part != b"":
419 if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part)):
420 self._force_line_print = True;
421 if self._output_enabled:
422 self.console.write_bytes(self._last_line_part)
423 self.handle_possible_pc_address_in_line(self._last_line_part)
424 self.check_gdbstub_trigger(self._last_line_part)
425 # It is possible that the incomplete line cuts in half the PC
426 # address. A small buffer is kept and will be used the next time
427 # handle_possible_pc_address_in_line is invoked to avoid this problem.
428 # MATCH_PCADDR matches 10 character long addresses. Therefore, we
429 # keep the last 9 characters.
430 self._pc_address_buffer = self._last_line_part[-9:]
431 # GDB sequence can be cut in half also. GDB sequence is 7
432 # characters long, therefore, we save the last 6 characters.
433 self._gdb_buffer = self._last_line_part[-6:]
434 self._last_line_part = b""
435 # else: keeping _last_line_part and it will be processed the next time
436 # handle_serial_input is invoked
438 def handle_possible_pc_address_in_line(self, line):
439 line = self._pc_address_buffer + line
440 self._pc_address_buffer = b""
441 for m in re.finditer(MATCH_PCADDR, line):
442 self.lookup_pc_address(m.group())
444 def handle_menu_key(self, c):
445 if c == self.exit_key or c == self.menu_key: # send verbatim
446 self.serial.write(codecs.encode(c))
447 elif c in [ CTRL_H, 'h', 'H', '?' ]:
448 red_print(self.get_help_text())
449 elif c == CTRL_R: # Reset device via RTS
450 self.serial.setRTS(True)
452 self.serial.setRTS(False)
453 self.output_enable(True)
454 elif c == CTRL_F: # Recompile & upload
455 self.run_make("flash")
456 elif c == CTRL_A: # Recompile & upload app only
457 self.run_make("app-flash")
458 elif c == CTRL_Y: # Toggle output display
461 yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
462 # to fast trigger pause without press menu key
463 self.serial.setDTR(False) # IO0=HIGH
464 self.serial.setRTS(True) # EN=LOW, chip in reset
465 time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
466 self.serial.setDTR(True) # IO0=LOW
467 self.serial.setRTS(False) # EN=HIGH, chip out of reset
468 time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
469 self.serial.setDTR(False) # IO0=HIGH, done
471 red_print('--- unknown menu character {} --'.format(key_description(c)))
473 def get_help_text(self):
475 --- idf_monitor ({version}) - ESP-IDF monitor tool
476 --- based on miniterm from pySerial
478 --- {exit:8} Exit program
479 --- {menu:8} Menu escape key, followed by:
481 --- {menu:7} Send the menu character itself to remote
482 --- {exit:7} Send the exit character itself to remote
483 --- {reset:7} Reset target board via RTS line
484 --- {make:7} Run 'make flash' to build & flash
485 --- {appmake:7} Run 'make app-flash to build & flash app
486 --- {output:7} Toggle output display
487 --- {pause:7} Reset target into bootloader to pause app via RTS line
488 """.format(version=__version__,
489 exit=key_description(self.exit_key),
490 menu=key_description(self.menu_key),
491 reset=key_description(CTRL_R),
492 make=key_description(CTRL_F),
493 appmake=key_description(CTRL_A),
494 output=key_description(CTRL_Y),
495 pause=key_description(CTRL_P),
499 """ Use 'with self' to temporarily disable monitoring behaviour """
500 self.serial_reader.stop()
501 self.console_reader.stop()
503 def __exit__(self, *args, **kwargs):
504 """ Use 'with self' to temporarily disable monitoring behaviour """
505 self.console_reader.start()
506 self.serial_reader.start()
508 def prompt_next_action(self, reason):
509 self.console.setup() # set up console to trap input characters
513 --- Press {} to exit monitor.
514 --- Press {} to run 'make flash'.
515 --- Press {} to run 'make app-flash'.
516 --- Press any other key to resume monitor (resets target).""".format(reason,
517 key_description(self.exit_key),
518 key_description(CTRL_F),
519 key_description(CTRL_A)))
520 k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
522 k = self.console.getkey()
524 self.console.cleanup()
525 if k == self.exit_key:
526 self.event_queue.put((TAG_KEY, k))
527 elif k in [ CTRL_F, CTRL_A ]:
528 self.event_queue.put((TAG_KEY, self.menu_key))
529 self.event_queue.put((TAG_KEY, k))
531 def run_make(self, target):
533 yellow_print("Running make %s..." % target)
534 p = subprocess.Popen([self.make,
538 except KeyboardInterrupt:
540 if p.returncode != 0:
541 self.prompt_next_action("Build failed")
543 self.output_enable(True)
545 def lookup_pc_address(self, pc_addr):
546 translation = subprocess.check_output(
547 ["%saddr2line" % self.toolchain_prefix,
548 "-pfiaC", "-e", self.elf_file, pc_addr],
550 if not "?? ??:0" in translation:
551 yellow_print(translation)
553 def check_gdbstub_trigger(self, line):
554 line = self._gdb_buffer + line
555 self._gdb_buffer = b""
556 m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
559 chsum = sum(ord(p) for p in m.group(1)) & 0xFF
560 calc_chsum = int(m.group(2), 16)
562 return # payload wasn't valid hex digits
563 if chsum == calc_chsum:
566 red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum))
570 with self: # disable console control
571 sys.stderr.write(ANSI_NORMAL)
573 process = subprocess.Popen(["%sgdb" % self.toolchain_prefix,
574 "-ex", "set serial baud %d" % self.serial.baudrate,
575 "-ex", "target remote %s" % self.serial.port,
576 "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second
577 self.elf_file], cwd=".")
579 except KeyboardInterrupt:
580 pass # happens on Windows, maybe other OSes
583 # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
588 # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
589 subprocess.call(["stty", "sane"])
591 pass # don't care if there's no stty, we tried...
592 self.prompt_next_action("gdb exited")
594 def output_enable(self, enable):
595 self._output_enabled = enable
597 def output_toggle(self):
598 self._output_enabled = not self._output_enabled
599 yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled))
602 parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
606 help='Serial port device',
607 default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
612 help='Serial port baud rate',
614 default=os.environ.get('MONITOR_BAUD', 115200))
618 help='Command to run make',
619 type=str, default='make')
622 '--toolchain-prefix',
623 help="Triplet prefix to add before cross-toolchain names",
624 default=DEFAULT_TOOLCHAIN_PREFIX)
628 choices=['CR', 'LF', 'CRLF'],
629 type=lambda c: c.upper(),
630 help="End of line to use when sending to the serial port",
634 'elf_file', help='ELF file of application',
635 type=argparse.FileType('rb'))
639 help="Filtering string",
640 default=DEFAULT_PRINT_FILTER)
642 args = parser.parse_args()
644 if args.port.startswith("/dev/tty."):
645 args.port = args.port.replace("/dev/tty.", "/dev/cu.")
646 yellow_print("--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.")
647 yellow_print("--- Using %s instead..." % args.port)
649 serial_instance = serial.serial_for_url(args.port, args.baud,
651 serial_instance.dtr = False
652 serial_instance.rts = False
654 args.elf_file.close() # don't need this as a file
656 # remove the parallel jobserver arguments from MAKEFLAGS, as any
657 # parent make is only running 1 job (monitor), so we can re-spawn
658 # all of the child makes we need (the -j argument remains part of
661 makeflags = os.environ["MAKEFLAGS"]
662 makeflags = re.sub(r"--jobserver[^ =]*=[0-9,]+ ?", "", makeflags)
663 os.environ["MAKEFLAGS"] = makeflags
665 pass # not running a make jobserver
667 monitor = Monitor(serial_instance, args.elf_file.name, args.print_filter, args.make, args.toolchain_prefix, args.eol)
669 yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
671 yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
672 key_description(monitor.exit_key),
673 key_description(monitor.menu_key),
674 key_description(monitor.menu_key),
675 key_description(CTRL_H)))
676 if args.print_filter != DEFAULT_PRINT_FILTER:
677 yellow_print('--- Print filter: {} ---'.format(args.print_filter))
682 # Windows console stuff
684 STD_OUTPUT_HANDLE = -11
685 STD_ERROR_HANDLE = -12
688 FOREGROUND_INTENSITY = 8
691 # matches the ANSI color change sequences that IDF sends
692 RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
694 # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
695 ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ]
697 GetStdHandle = ctypes.windll.kernel32.GetStdHandle
698 SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
700 class ANSIColorConverter(object):
701 """Class to wrap a file-like output stream, intercept ANSI color codes,
702 and convert them into calls to Windows SetConsoleTextAttribute.
704 Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses.
706 Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text
707 color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the
708 least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
711 def __init__(self, output):
713 self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
716 def _output_write(self, data):
718 self.output.write(data)
720 # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
721 # an exception (however, the character is still written to the screen)
722 # Ref https://github.com/espressif/esp-idf/issues/1136
725 def write(self, data):
727 l = len(self.matched)
728 if b == '\033': # ESC
730 elif (l == 1 and b == '[') or (1 < l < 7):
732 if self.matched == ANSI_NORMAL: # reset console
733 SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
735 elif len(self.matched) == 7: # could be an ANSI sequence
736 m = re.match(RE_ANSI_COLOR, self.matched)
738 color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
739 if m.group(1) == b'1':
740 color |= FOREGROUND_INTENSITY
741 SetConsoleTextAttribute(self.handle, color)
743 self._output_write(self.matched) # not an ANSI color code, display verbatim
746 self._output_write(b)
753 if __name__ == "__main__":