import types
from distutils.version import StrictVersion
from io import open
+import textwrap
key_description = miniterm.key_description
CTRL_L = '\x0c'
CTRL_RBRACKET = '\x1d' # Ctrl+]
+# Command parsed from console inputs
+CMD_STOP = 1
+CMD_RESET = 2
+CMD_MAKE = 3
+CMD_APP_FLASH = 4
+CMD_OUTPUT_TOGGLE = 5
+CMD_TOGGLE_LOGGING = 6
+CMD_ENTER_BOOT = 7
+
# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
ANSI_RED = '\033[1;31m'
ANSI_YELLOW = '\033[0;33m'
TAG_KEY = 0
TAG_SERIAL = 1
TAG_SERIAL_FLUSH = 2
+TAG_CMD = 3
# regex matches an potential PC value (0x4xxxxxxx)
MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
""" Read input keys from the console and push them to the queue,
until stopped.
"""
- def __init__(self, console, event_queue, test_mode):
+ def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
super(ConsoleReader, self).__init__()
self.console = console
self.event_queue = event_queue
+ self.cmd_queue = cmd_queue
+ self.parser = parser
self.test_mode = test_mode
def run(self):
except KeyboardInterrupt:
c = '\x03'
if c is not None:
- self.event_queue.put((TAG_KEY, c), False)
+ ret = self.parser.parse(c)
+ if ret is not None:
+ (tag, cmd) = ret
+ # stop command should be executed last
+ if tag == TAG_CMD and cmd != CMD_STOP:
+ self.cmd_queue.put(ret)
+ else:
+ self.event_queue.put(ret)
+
finally:
self.console.cleanup()
fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
+class ConsoleParser(object):
+
+ def __init__(self, eol="CRLF"):
+ self.translate_eol = {
+ "CRLF": lambda c: c.replace("\n", "\r\n"),
+ "CR": lambda c: c.replace("\n", "\r"),
+ "LF": lambda c: c.replace("\r", "\n"),
+ }[eol]
+ self.menu_key = CTRL_T
+ self.exit_key = CTRL_RBRACKET
+ self._pressed_menu_key = False
+
+ def parse(self, key):
+ ret = None
+ if self._pressed_menu_key:
+ ret = self._handle_menu_key(key)
+ elif key == self.menu_key:
+ self._pressed_menu_key = True
+ elif key == self.exit_key:
+ ret = (TAG_CMD, CMD_STOP)
+ else:
+ key = self.translate_eol(key)
+ ret = (TAG_KEY, key)
+ return ret
+
+ def _handle_menu_key(self, c):
+ ret = None
+ if c == self.exit_key or c == self.menu_key: # send verbatim
+ ret = (TAG_KEY, c)
+ elif c in [CTRL_H, 'h', 'H', '?']:
+ red_print(self.get_help_text())
+ elif c == CTRL_R: # Reset device via RTS
+ ret = (TAG_CMD, CMD_RESET)
+ elif c == CTRL_F: # Recompile & upload
+ ret = (TAG_CMD, CMD_MAKE)
+ elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
+ # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
+ # instead
+ ret = (TAG_CMD, CMD_APP_FLASH)
+ elif c == CTRL_Y: # Toggle output display
+ ret = (TAG_CMD, CMD_OUTPUT_TOGGLE)
+ elif c == CTRL_L: # Toggle saving output into file
+ ret = (TAG_CMD, CMD_TOGGLE_LOGGING)
+ elif c == CTRL_P:
+ yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
+ # to fast trigger pause without press menu key
+ ret = (TAG_CMD, CMD_ENTER_BOOT)
+ else:
+ red_print('--- unknown menu character {} --'.format(key_description(c)))
+
+ self._pressed_menu_key = False
+ return ret
+
+ def get_help_text(self):
+ text = """\
+ --- idf_monitor ({version}) - ESP-IDF monitor tool
+ --- based on miniterm from pySerial
+ ---
+ --- {exit:8} Exit program
+ --- {menu:8} Menu escape key, followed by:
+ --- Menu keys:
+ --- {menu:14} Send the menu character itself to remote
+ --- {exit:14} Send the exit character itself to remote
+ --- {reset:14} Reset target board via RTS line
+ --- {makecmd:14} Build & flash project
+ --- {appmake:14} Build & flash app only
+ --- {output:14} Toggle output display
+ --- {log:14} Toggle saving output into file
+ --- {pause:14} Reset target into bootloader to pause app via RTS line
+ """.format(version=__version__,
+ exit=key_description(self.exit_key),
+ menu=key_description(self.menu_key),
+ reset=key_description(CTRL_R),
+ makecmd=key_description(CTRL_F),
+ appmake=key_description(CTRL_A) + ' (or A)',
+ output=key_description(CTRL_Y),
+ log=key_description(CTRL_L),
+ pause=key_description(CTRL_P))
+ return textwrap.dedent(text)
+
+ def get_next_action_text(self):
+ text = """\
+ --- Press {} to exit monitor.
+ --- Press {} to build & flash project.
+ --- Press {} to build & flash app.
+ --- Press any other key to resume monitor (resets target).
+ """.format(key_description(self.exit_key),
+ key_description(CTRL_F),
+ key_description(CTRL_A))
+ return textwrap.dedent(text)
+
+ def parse_next_action_key(self, c):
+ ret = None
+ if c == self.exit_key:
+ ret = (TAG_CMD, CMD_STOP)
+ elif c == CTRL_F: # Recompile & upload
+ ret = (TAG_CMD, CMD_MAKE)
+ elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
+ # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
+ # instead
+ ret = (TAG_CMD, CMD_APP_FLASH)
+ return ret
+
+
class SerialReader(StoppableThread):
""" Read serial data from the serial port and push to the
event queue, until stopped.
def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
super(Monitor, self).__init__()
self.event_queue = queue.Queue()
+ self.cmd_queue = queue.Queue()
self.console = miniterm.Console()
if os.name == 'nt':
sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
self.serial = serial_instance
- self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
+ self.console_parser = ConsoleParser(eol)
+ self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode)
self.serial_reader = SerialReader(self.serial, self.event_queue)
self.elf_file = elf_file
if not os.path.exists(make):
else:
self.make = make
self.toolchain_prefix = toolchain_prefix
- self.menu_key = CTRL_T
- self.exit_key = CTRL_RBRACKET
-
- self.translate_eol = {
- "CRLF": lambda c: c.replace("\n", "\r\n"),
- "CR": lambda c: c.replace("\n", "\r"),
- "LF": lambda c: c.replace("\r", "\n"),
- }[eol]
# internal state
- self._pressed_menu_key = False
self._last_line_part = b""
self._gdb_buffer = b""
self._pc_address_buffer = b""
self.serial_reader.start()
try:
while self.console_reader.alive and self.serial_reader.alive:
- (event_tag, data) = self.event_queue.get()
- if event_tag == TAG_KEY:
- self.handle_key(data)
+ try:
+ item = self.cmd_queue.get_nowait()
+ except queue.Empty:
+ try:
+ item = self.event_queue.get(False, 0.001)
+ except queue.Empty:
+ continue
+
+ (event_tag, data) = item
+ if event_tag == TAG_CMD:
+ self.handle_commands(data)
+ elif event_tag == TAG_KEY:
+ try:
+ self.serial.write(codecs.encode(data))
+ except serial.SerialException:
+ pass # this shouldn't happen, but sometimes port has closed in serial thread
+ except UnicodeEncodeError:
+ pass # this can happen if a non-ascii character was passed, ignoring
elif event_tag == TAG_SERIAL:
self.handle_serial_input(data)
if self._invoke_processing_last_line_timer is not None:
pass
sys.stderr.write(ANSI_NORMAL + "\n")
- def handle_key(self, key):
- if self._pressed_menu_key:
- self.handle_menu_key(key)
- self._pressed_menu_key = False
- elif key == self.menu_key:
- self._pressed_menu_key = True
- elif key == self.exit_key:
- self.console_reader.stop()
- self.serial_reader.stop()
- else:
- try:
- key = self.translate_eol(key)
- self.serial.write(codecs.encode(key))
- except serial.SerialException:
- pass # this shouldn't happen, but sometimes port has closed in serial thread
- except UnicodeEncodeError:
- pass # this can happen if a non-ascii character was passed, ignoring
-
def handle_serial_input(self, data, finalize_line=False):
sp = data.split(b'\n')
if self._last_line_part != b"":
self._last_line_part = sp.pop()
for line in sp:
if line != b"":
- if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
+ if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
raise SerialStopException()
if self._force_line_print or self._line_matcher.match(line.decode(errors="ignore")):
self._print(line + b'\n')
for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
self.lookup_pc_address(m.group())
- def handle_menu_key(self, c):
- if c == self.exit_key or c == self.menu_key: # send verbatim
- self.serial.write(codecs.encode(c))
- elif c in [CTRL_H, 'h', 'H', '?']:
- red_print(self.get_help_text())
- elif c == CTRL_R: # Reset device via RTS
- self.serial.setRTS(True)
- time.sleep(0.2)
- self.serial.setRTS(False)
- self.output_enable(True)
- elif c == CTRL_F: # Recompile & upload
- self.run_make("flash")
- elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
- # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
- # instead
- self.run_make("app-flash")
- elif c == CTRL_Y: # Toggle output display
- self.output_toggle()
- elif c == CTRL_L: # Toggle saving output into file
- self.toggle_logging()
- elif c == CTRL_P:
- yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
- # to fast trigger pause without press menu key
- self.serial.setDTR(False) # IO0=HIGH
- self.serial.setRTS(True) # EN=LOW, chip in reset
- time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
- self.serial.setDTR(True) # IO0=LOW
- self.serial.setRTS(False) # EN=HIGH, chip out of reset
- time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
- self.serial.setDTR(False) # IO0=HIGH, done
- else:
- red_print('--- unknown menu character {} --'.format(key_description(c)))
-
- def get_help_text(self):
- return """
---- idf_monitor ({version}) - ESP-IDF monitor tool
---- based on miniterm from pySerial
----
---- {exit:8} Exit program
---- {menu:8} Menu escape key, followed by:
---- Menu keys:
---- {menu:14} Send the menu character itself to remote
---- {exit:14} Send the exit character itself to remote
---- {reset:14} Reset target board via RTS line
---- {makecmd:14} Build & flash project
---- {appmake:14} Build & flash app only
---- {output:14} Toggle output display
---- {log:14} Toggle saving output into file
---- {pause:14} Reset target into bootloader to pause app via RTS line
-""".format(version=__version__,
- exit=key_description(self.exit_key),
- menu=key_description(self.menu_key),
- reset=key_description(CTRL_R),
- makecmd=key_description(CTRL_F),
- appmake=key_description(CTRL_A) + ' (or A)',
- output=key_description(CTRL_Y),
- log=key_description(CTRL_L),
- pause=key_description(CTRL_P))
-
def __enter__(self):
""" Use 'with self' to temporarily disable monitoring behaviour """
self.serial_reader.stop()
def prompt_next_action(self, reason):
self.console.setup() # set up console to trap input characters
try:
- red_print("""
---- {}
---- Press {} to exit monitor.
---- Press {} to build & flash project.
---- Press {} to build & flash app.
---- Press any other key to resume monitor (resets target).""".format(reason,
- key_description(self.exit_key),
- key_description(CTRL_F),
- key_description(CTRL_A)))
+ red_print("--- {}".format(reason))
+ red_print(self.console_parser.get_next_action_text())
+
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T:
k = self.console.getkey()
finally:
self.console.cleanup()
- if k == self.exit_key:
- self.event_queue.put((TAG_KEY, k))
- elif k in [CTRL_F, CTRL_A]:
- self.event_queue.put((TAG_KEY, self.menu_key))
- self.event_queue.put((TAG_KEY, k))
+ ret = self.console_parser.parse_next_action_key(k)
+ if ret is not None:
+ cmd = ret[1]
+ if cmd == CMD_STOP:
+ # the stop command should be handled last
+ self.event_queue.put(ret)
+ else:
+ self.cmd_queue.put(ret)
def run_make(self, target):
with self:
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
self.stop_logging()
+ def handle_commands(self, cmd):
+ if cmd == CMD_STOP:
+ self.console_reader.stop()
+ self.serial_reader.stop()
+ elif cmd == CMD_RESET:
+ self.serial.setRTS(True)
+ time.sleep(0.2)
+ self.serial.setRTS(False)
+ self.output_enable(True)
+ elif cmd == CMD_MAKE:
+ self.run_make("flash")
+ elif cmd == CMD_APP_FLASH:
+ self.run_make("app-flash")
+ elif cmd == CMD_OUTPUT_TOGGLE:
+ self.output_toggle()
+ elif cmd == CMD_TOGGLE_LOGGING:
+ self.toggle_logging()
+ elif cmd == CMD_ENTER_BOOT:
+ self.serial.setDTR(False) # IO0=HIGH
+ self.serial.setRTS(True) # EN=LOW, chip in reset
+ time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
+ self.serial.setDTR(True) # IO0=LOW
+ self.serial.setRTS(False) # EN=HIGH, chip out of reset
+ time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
+ self.serial.setDTR(False) # IO0=HIGH, done
+ else:
+ raise RuntimeError("Bad command data %d" % (cmd))
+
def main():
parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
p=serial_instance))
yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
- key_description(monitor.exit_key),
- key_description(monitor.menu_key),
- key_description(monitor.menu_key),
+ key_description(monitor.console_parser.exit_key),
+ key_description(monitor.console_parser.menu_key),
+ key_description(monitor.console_parser.menu_key),
key_description(CTRL_H)))
if args.print_filter != DEFAULT_PRINT_FILTER:
yellow_print('--- Print filter: {} ---'.format(args.print_filter))