]> granicus.if.org Git - esp-idf/commitdiff
idf_monitor: improve the responding of console commands
authorMichael (XIAO Xufeng) <xiaoxufeng@espressif.com>
Sat, 6 Jul 2019 04:43:35 +0000 (12:43 +0800)
committerMichael (XIAO Xufeng) <xiaoxufeng@espressif.com>
Thu, 1 Aug 2019 03:55:42 +0000 (11:55 +0800)
tools/idf_monitor.py

index f68020613fc70d94b6e2631292f380885f59860f..5320149dbe172f6a09025879dbe227b4ea8de576 100755 (executable)
@@ -52,6 +52,7 @@ import ctypes
 import types
 from distutils.version import StrictVersion
 from io import open
+import textwrap
 
 key_description = miniterm.key_description
 
@@ -67,6 +68,15 @@ CTRL_P = '\x10'
 CTRL_L = '\x0c'
 CTRL_RBRACKET = '\x1d'  # Ctrl+]
 
+# Command parsed from console inputs
+CMD_STOP = 1
+CMD_RESET = 2
+CMD_MAKE = 3
+CMD_APP_FLASH = 4
+CMD_OUTPUT_TOGGLE = 5
+CMD_TOGGLE_LOGGING = 6
+CMD_ENTER_BOOT = 7
+
 # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
 ANSI_RED = '\033[1;31m'
 ANSI_YELLOW = '\033[0;33m'
@@ -92,6 +102,7 @@ __version__ = "1.1"
 TAG_KEY = 0
 TAG_SERIAL = 1
 TAG_SERIAL_FLUSH = 2
+TAG_CMD = 3
 
 # regex matches an potential PC value (0x4xxxxxxx)
 MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
@@ -149,10 +160,12 @@ class ConsoleReader(StoppableThread):
     """ Read input keys from the console and push them to the queue,
     until stopped.
     """
-    def __init__(self, console, event_queue, test_mode):
+    def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
         super(ConsoleReader, self).__init__()
         self.console = console
         self.event_queue = event_queue
+        self.cmd_queue = cmd_queue
+        self.parser = parser
         self.test_mode = test_mode
 
     def run(self):
@@ -181,7 +194,15 @@ class ConsoleReader(StoppableThread):
                 except KeyboardInterrupt:
                     c = '\x03'
                 if c is not None:
-                    self.event_queue.put((TAG_KEY, c), False)
+                    ret = self.parser.parse(c)
+                    if ret is not None:
+                        (tag, cmd) = ret
+                        # stop command should be executed last
+                        if tag == TAG_CMD and cmd != CMD_STOP:
+                            self.cmd_queue.put(ret)
+                        else:
+                            self.event_queue.put(ret)
+
         finally:
             self.console.cleanup()
 
@@ -204,6 +225,110 @@ class ConsoleReader(StoppableThread):
             fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
 
 
+class ConsoleParser(object):
+
+    def __init__(self, eol="CRLF"):
+        self.translate_eol = {
+            "CRLF": lambda c: c.replace("\n", "\r\n"),
+            "CR": lambda c: c.replace("\n", "\r"),
+            "LF": lambda c: c.replace("\r", "\n"),
+        }[eol]
+        self.menu_key = CTRL_T
+        self.exit_key = CTRL_RBRACKET
+        self._pressed_menu_key = False
+
+    def parse(self, key):
+        ret = None
+        if self._pressed_menu_key:
+            ret = self._handle_menu_key(key)
+        elif key == self.menu_key:
+            self._pressed_menu_key = True
+        elif key == self.exit_key:
+            ret = (TAG_CMD, CMD_STOP)
+        else:
+            key = self.translate_eol(key)
+            ret = (TAG_KEY, key)
+        return ret
+
+    def _handle_menu_key(self, c):
+        ret = None
+        if c == self.exit_key or c == self.menu_key:  # send verbatim
+            ret = (TAG_KEY, c)
+        elif c in [CTRL_H, 'h', 'H', '?']:
+            red_print(self.get_help_text())
+        elif c == CTRL_R:  # Reset device via RTS
+            ret = (TAG_CMD, CMD_RESET)
+        elif c == CTRL_F:  # Recompile & upload
+            ret = (TAG_CMD, CMD_MAKE)
+        elif c in [CTRL_A, 'a', 'A']:  # Recompile & upload app only
+            # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
+            # instead
+            ret = (TAG_CMD, CMD_APP_FLASH)
+        elif c == CTRL_Y:  # Toggle output display
+            ret = (TAG_CMD, CMD_OUTPUT_TOGGLE)
+        elif c == CTRL_L:  # Toggle saving output into file
+            ret = (TAG_CMD, CMD_TOGGLE_LOGGING)
+        elif c == CTRL_P:
+            yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
+            # to fast trigger pause without press menu key
+            ret = (TAG_CMD, CMD_ENTER_BOOT)
+        else:
+            red_print('--- unknown menu character {} --'.format(key_description(c)))
+
+        self._pressed_menu_key = False
+        return ret
+
+    def get_help_text(self):
+        text = """\
+            --- idf_monitor ({version}) - ESP-IDF monitor tool
+            --- based on miniterm from pySerial
+            ---
+            --- {exit:8} Exit program
+            --- {menu:8} Menu escape key, followed by:
+            --- Menu keys:
+            ---    {menu:14} Send the menu character itself to remote
+            ---    {exit:14} Send the exit character itself to remote
+            ---    {reset:14} Reset target board via RTS line
+            ---    {makecmd:14} Build & flash project
+            ---    {appmake:14} Build & flash app only
+            ---    {output:14} Toggle output display
+            ---    {log:14} Toggle saving output into file
+            ---    {pause:14} Reset target into bootloader to pause app via RTS line
+        """.format(version=__version__,
+                   exit=key_description(self.exit_key),
+                   menu=key_description(self.menu_key),
+                   reset=key_description(CTRL_R),
+                   makecmd=key_description(CTRL_F),
+                   appmake=key_description(CTRL_A) + ' (or A)',
+                   output=key_description(CTRL_Y),
+                   log=key_description(CTRL_L),
+                   pause=key_description(CTRL_P))
+        return textwrap.dedent(text)
+
+    def get_next_action_text(self):
+        text = """\
+            --- Press {} to exit monitor.
+            --- Press {} to build & flash project.
+            --- Press {} to build & flash app.
+            --- Press any other key to resume monitor (resets target).
+        """.format(key_description(self.exit_key),
+                   key_description(CTRL_F),
+                   key_description(CTRL_A))
+        return textwrap.dedent(text)
+
+    def parse_next_action_key(self, c):
+        ret = None
+        if c == self.exit_key:
+            ret = (TAG_CMD, CMD_STOP)
+        elif c == CTRL_F:  # Recompile & upload
+            ret = (TAG_CMD, CMD_MAKE)
+        elif c in [CTRL_A, 'a', 'A']:  # Recompile & upload app only
+            # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
+            # instead
+            ret = (TAG_CMD, CMD_APP_FLASH)
+        return ret
+
+
 class SerialReader(StoppableThread):
     """ Read serial data from the serial port and push to the
     event queue, until stopped.
@@ -313,6 +438,7 @@ class Monitor(object):
     def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
         super(Monitor, self).__init__()
         self.event_queue = queue.Queue()
+        self.cmd_queue = queue.Queue()
         self.console = miniterm.Console()
         if os.name == 'nt':
             sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
@@ -331,7 +457,8 @@ class Monitor(object):
 
         socket_mode = serial_instance.port.startswith("socket://")  # testing hook - data from serial can make exit the monitor
         self.serial = serial_instance
-        self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
+        self.console_parser = ConsoleParser(eol)
+        self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode)
         self.serial_reader = SerialReader(self.serial, self.event_queue)
         self.elf_file = elf_file
         if not os.path.exists(make):
@@ -339,17 +466,8 @@ class Monitor(object):
         else:
             self.make = make
         self.toolchain_prefix = toolchain_prefix
-        self.menu_key = CTRL_T
-        self.exit_key = CTRL_RBRACKET
-
-        self.translate_eol = {
-            "CRLF": lambda c: c.replace("\n", "\r\n"),
-            "CR": lambda c: c.replace("\n", "\r"),
-            "LF": lambda c: c.replace("\r", "\n"),
-        }[eol]
 
         # internal state
-        self._pressed_menu_key = False
         self._last_line_part = b""
         self._gdb_buffer = b""
         self._pc_address_buffer = b""
@@ -368,9 +486,24 @@ class Monitor(object):
         self.serial_reader.start()
         try:
             while self.console_reader.alive and self.serial_reader.alive:
-                (event_tag, data) = self.event_queue.get()
-                if event_tag == TAG_KEY:
-                    self.handle_key(data)
+                try:
+                    item = self.cmd_queue.get_nowait()
+                except queue.Empty:
+                    try:
+                        item = self.event_queue.get(False, 0.001)
+                    except queue.Empty:
+                        continue
+
+                (event_tag, data) = item
+                if event_tag == TAG_CMD:
+                    self.handle_commands(data)
+                elif event_tag == TAG_KEY:
+                    try:
+                        self.serial.write(codecs.encode(data))
+                    except serial.SerialException:
+                        pass  # this shouldn't happen, but sometimes port has closed in serial thread
+                    except UnicodeEncodeError:
+                        pass  # this can happen if a non-ascii character was passed, ignoring
                 elif event_tag == TAG_SERIAL:
                     self.handle_serial_input(data)
                     if self._invoke_processing_last_line_timer is not None:
@@ -400,24 +533,6 @@ class Monitor(object):
                 pass
             sys.stderr.write(ANSI_NORMAL + "\n")
 
-    def handle_key(self, key):
-        if self._pressed_menu_key:
-            self.handle_menu_key(key)
-            self._pressed_menu_key = False
-        elif key == self.menu_key:
-            self._pressed_menu_key = True
-        elif key == self.exit_key:
-            self.console_reader.stop()
-            self.serial_reader.stop()
-        else:
-            try:
-                key = self.translate_eol(key)
-                self.serial.write(codecs.encode(key))
-            except serial.SerialException:
-                pass  # this shouldn't happen, but sometimes port has closed in serial thread
-            except UnicodeEncodeError:
-                pass  # this can happen if a non-ascii character was passed, ignoring
-
     def handle_serial_input(self, data, finalize_line=False):
         sp = data.split(b'\n')
         if self._last_line_part != b"":
@@ -429,7 +544,7 @@ class Monitor(object):
             self._last_line_part = sp.pop()
         for line in sp:
             if line != b"":
-                if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
+                if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
                     raise SerialStopException()
                 if self._force_line_print or self._line_matcher.match(line.decode(errors="ignore")):
                     self._print(line + b'\n')
@@ -465,65 +580,6 @@ class Monitor(object):
         for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
             self.lookup_pc_address(m.group())
 
-    def handle_menu_key(self, c):
-        if c == self.exit_key or c == self.menu_key:  # send verbatim
-            self.serial.write(codecs.encode(c))
-        elif c in [CTRL_H, 'h', 'H', '?']:
-            red_print(self.get_help_text())
-        elif c == CTRL_R:  # Reset device via RTS
-            self.serial.setRTS(True)
-            time.sleep(0.2)
-            self.serial.setRTS(False)
-            self.output_enable(True)
-        elif c == CTRL_F:  # Recompile & upload
-            self.run_make("flash")
-        elif c in [CTRL_A, 'a', 'A']:  # Recompile & upload app only
-            # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
-            # instead
-            self.run_make("app-flash")
-        elif c == CTRL_Y:  # Toggle output display
-            self.output_toggle()
-        elif c == CTRL_L:  # Toggle saving output into file
-            self.toggle_logging()
-        elif c == CTRL_P:
-            yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
-            # to fast trigger pause without press menu key
-            self.serial.setDTR(False)  # IO0=HIGH
-            self.serial.setRTS(True)   # EN=LOW, chip in reset
-            time.sleep(1.3)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
-            self.serial.setDTR(True)   # IO0=LOW
-            self.serial.setRTS(False)  # EN=HIGH, chip out of reset
-            time.sleep(0.45)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
-            self.serial.setDTR(False)  # IO0=HIGH, done
-        else:
-            red_print('--- unknown menu character {} --'.format(key_description(c)))
-
-    def get_help_text(self):
-        return """
---- idf_monitor ({version}) - ESP-IDF monitor tool
---- based on miniterm from pySerial
----
---- {exit:8} Exit program
---- {menu:8} Menu escape key, followed by:
---- Menu keys:
----    {menu:14} Send the menu character itself to remote
----    {exit:14} Send the exit character itself to remote
----    {reset:14} Reset target board via RTS line
----    {makecmd:14} Build & flash project
----    {appmake:14} Build & flash app only
----    {output:14} Toggle output display
----    {log:14} Toggle saving output into file
----    {pause:14} Reset target into bootloader to pause app via RTS line
-""".format(version=__version__,
-           exit=key_description(self.exit_key),
-           menu=key_description(self.menu_key),
-           reset=key_description(CTRL_R),
-           makecmd=key_description(CTRL_F),
-           appmake=key_description(CTRL_A) + ' (or A)',
-           output=key_description(CTRL_Y),
-           log=key_description(CTRL_L),
-           pause=key_description(CTRL_P))
-
     def __enter__(self):
         """ Use 'with self' to temporarily disable monitoring behaviour """
         self.serial_reader.stop()
@@ -537,25 +593,22 @@ class Monitor(object):
     def prompt_next_action(self, reason):
         self.console.setup()  # set up console to trap input characters
         try:
-            red_print("""
---- {}
---- Press {} to exit monitor.
---- Press {} to build & flash project.
---- Press {} to build & flash app.
---- Press any other key to resume monitor (resets target).""".format(reason,
-                                                                     key_description(self.exit_key),
-                                                                     key_description(CTRL_F),
-                                                                     key_description(CTRL_A)))
+            red_print("--- {}".format(reason))
+            red_print(self.console_parser.get_next_action_text())
+
             k = CTRL_T  # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
             while k == CTRL_T:
                 k = self.console.getkey()
         finally:
             self.console.cleanup()
-        if k == self.exit_key:
-            self.event_queue.put((TAG_KEY, k))
-        elif k in [CTRL_F, CTRL_A]:
-            self.event_queue.put((TAG_KEY, self.menu_key))
-            self.event_queue.put((TAG_KEY, k))
+        ret = self.console_parser.parse_next_action_key(k)
+        if ret is not None:
+            cmd = ret[1]
+            if cmd == CMD_STOP:
+                # the stop command should be handled last
+                self.event_queue.put(ret)
+            else:
+                self.cmd_queue.put(ret)
 
     def run_make(self, target):
         with self:
@@ -676,6 +729,34 @@ class Monitor(object):
                 # don't fill-up the screen with the previous errors (probably consequent prints would fail also)
                 self.stop_logging()
 
+    def handle_commands(self, cmd):
+        if cmd == CMD_STOP:
+            self.console_reader.stop()
+            self.serial_reader.stop()
+        elif cmd == CMD_RESET:
+            self.serial.setRTS(True)
+            time.sleep(0.2)
+            self.serial.setRTS(False)
+            self.output_enable(True)
+        elif cmd == CMD_MAKE:
+            self.run_make("flash")
+        elif cmd == CMD_APP_FLASH:
+            self.run_make("app-flash")
+        elif cmd == CMD_OUTPUT_TOGGLE:
+            self.output_toggle()
+        elif cmd == CMD_TOGGLE_LOGGING:
+            self.toggle_logging()
+        elif cmd == CMD_ENTER_BOOT:
+            self.serial.setDTR(False)  # IO0=HIGH
+            self.serial.setRTS(True)   # EN=LOW, chip in reset
+            time.sleep(1.3)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
+            self.serial.setDTR(True)   # IO0=LOW
+            self.serial.setRTS(False)  # EN=HIGH, chip out of reset
+            time.sleep(0.45)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
+            self.serial.setDTR(False)  # IO0=HIGH, done
+        else:
+            raise RuntimeError("Bad command data %d" % (cmd))
+
 
 def main():
     parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
@@ -748,9 +829,9 @@ def main():
     yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
         p=serial_instance))
     yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
-        key_description(monitor.exit_key),
-        key_description(monitor.menu_key),
-        key_description(monitor.menu_key),
+        key_description(monitor.console_parser.exit_key),
+        key_description(monitor.console_parser.menu_key),
+        key_description(monitor.console_parser.menu_key),
         key_description(CTRL_H)))
     if args.print_filter != DEFAULT_PRINT_FILTER:
         yellow_print('--- Print filter: {} ---'.format(args.print_filter))