#
from __future__ import print_function, division
from __future__ import unicode_literals
-from future import standard_library
-standard_library.install_aliases()
from builtins import chr
from builtins import object
from builtins import bytes
ANSI_YELLOW = '\033[0;33m'
ANSI_NORMAL = '\033[0m'
+
def color_print(message, color):
""" Print a message to stderr with colored highlighting """
sys.stderr.write("%s%s%s\n" % (color, message, ANSI_NORMAL))
+
def yellow_print(message):
color_print(message, ANSI_YELLOW)
+
def red_print(message):
color_print(message, ANSI_RED)
+
__version__ = "1.1"
# Tags for tuples in queues
DEFAULT_PRINT_FILTER = ""
+
class StoppableThread(object):
"""
Provide a Thread-like class which can be 'cancelled' via a subclass-provided
self._thread.start()
def _cancel(self):
- pass # override to provide cancellation functionality
+ pass # override to provide cancellation functionality
def run(self):
- pass # override for the main thread behaviour
+ pass # override for the main thread behaviour
def _run_outer(self):
try:
self._cancel()
old_thread.join()
+
class ConsoleReader(StoppableThread):
""" Read input keys from the console and push them to the queue,
until stopped.
# TODO: introduce some workaround to make it work there.
#
# Note: This would throw exception in testing mode when the stdin is connected to PTY.
- import fcntl, termios
+ import fcntl
+ import termios
fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
+
class SerialReader(StoppableThread):
""" Read serial data from the serial port and push to the
event queue, until stopped.
if hasattr(self.serial, 'cancel_read'):
try:
self.serial.cancel_read()
- except:
+ except Exception:
pass
+
class LineMatcher(object):
"""
Assembles a dictionary of filtering rules based on the --print_filter
LEVEL_V = 5
level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
- 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
+ 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
def __init__(self, print_filter):
self._dict = dict()
self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
items = print_filter.split()
if len(items) == 0:
- self._dict["*"] = self.LEVEL_V # default is to print everything
+ self._dict["*"] = self.LEVEL_V # default is to print everything
for f in items:
s = f.split(r':')
if len(s) == 1:
else:
raise ValueError('Missing ":" in filter ' + f)
self._dict[s[0]] = lev
+
def match(self, line):
try:
m = self._re.search(line)
# We need something more than "*.N" for printing.
return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N
+
class SerialStopException(Exception):
"""
This exception is used for stopping the IDF monitor in testing mode.
"""
pass
+
class Monitor(object):
"""
Monitor application main class.
self.console.getkey = types.MethodType(getkey_patched, self.console)
- socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
+ socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
self.serial = serial_instance
self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
self.serial_reader = SerialReader(self.serial, self.event_queue)
self.translate_eol = {
"CRLF": lambda c: c.replace("\n", "\r\n"),
- "CR": lambda c: c.replace("\n", "\r"),
- "LF": lambda c: c.replace("\r", "\n"),
+ "CR": lambda c: c.replace("\n", "\r"),
+ "LF": lambda c: c.replace("\r", "\n"),
}[eol]
# internal state
# Cancelling _invoke_processing_last_line_timer is not
# important here because receiving empty data doesn't matter.
self._invoke_processing_last_line_timer = None
- except:
+ except Exception:
pass
sys.stderr.write(ANSI_NORMAL + "\n")
key = self.translate_eol(key)
self.serial.write(codecs.encode(key))
except serial.SerialException:
- pass # this shouldn't happen, but sometimes port has closed in serial thread
+ pass # this shouldn't happen, but sometimes port has closed in serial thread
except UnicodeEncodeError:
- pass # this can happen if a non-ascii character was passed, ignoring
+ pass # this can happen if a non-ascii character was passed, ignoring
def handle_serial_input(self, data, finalize_line=False):
sp = data.split(b'\n')
# to make a decision.
if self._last_line_part != b"":
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))):
- self._force_line_print = True;
+ self._force_line_print = True
if self._output_enabled:
self.console.write_bytes(self._last_line_part)
self.handle_possible_pc_address_in_line(self._last_line_part)
def handle_menu_key(self, c):
if c == self.exit_key or c == self.menu_key: # send verbatim
self.serial.write(codecs.encode(c))
- elif c in [ CTRL_H, 'h', 'H', '?' ]:
+ elif c in [CTRL_H, 'h', 'H', '?']:
red_print(self.get_help_text())
elif c == CTRL_R: # Reset device via RTS
self.serial.setRTS(True)
# to fast trigger pause without press menu key
self.serial.setDTR(False) # IO0=HIGH
self.serial.setRTS(True) # EN=LOW, chip in reset
- time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
+ time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
self.serial.setDTR(True) # IO0=LOW
self.serial.setRTS(False) # EN=HIGH, chip out of reset
- time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
+ time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
self.serial.setDTR(False) # IO0=HIGH, done
else:
red_print('--- unknown menu character {} --'.format(key_description(c)))
makecmd=key_description(CTRL_F),
appmake=key_description(CTRL_A),
output=key_description(CTRL_Y),
- pause=key_description(CTRL_P) )
+ pause=key_description(CTRL_P))
def __enter__(self):
""" Use 'with self' to temporarily disable monitoring behaviour """
--- Press any other key to resume monitor (resets target).""".format(reason,
key_description(self.exit_key),
key_description(CTRL_F),
- key_description(CTRL_A) ))
+ key_description(CTRL_A)))
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T:
k = self.console.getkey()
self.console.cleanup()
if k == self.exit_key:
self.event_queue.put((TAG_KEY, k))
- elif k in [ CTRL_F, CTRL_A ]:
+ elif k in [CTRL_F, CTRL_A]:
self.event_queue.put((TAG_KEY, self.menu_key))
self.event_queue.put((TAG_KEY, k))
def run_make(self, target):
with self:
if isinstance(self.make, list):
- popen_args = self.make + [ target ]
+ popen_args = self.make + [target]
else:
- popen_args = [ self.make, target ]
+ popen_args = [self.make, target]
yellow_print("Running %s..." % " ".join(popen_args))
p = subprocess.Popen(popen_args)
try:
["%saddr2line" % self.toolchain_prefix,
"-pfiaC", "-e", self.elf_file, pc_addr],
cwd=".")
- if not b"?? ??:0" in translation:
+ if b"?? ??:0" not in translation:
yellow_print(translation.decode())
def check_gdbstub_trigger(self, line):
line = self._gdb_buffer + line
self._gdb_buffer = b""
- m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
+ m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
if m is not None:
try:
chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
else:
red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum))
-
def run_gdb(self):
with self: # disable console control
sys.stderr.write(ANSI_NORMAL)
try:
process = subprocess.Popen(["%sgdb" % self.toolchain_prefix,
- "-ex", "set serial baud %d" % self.serial.baudrate,
- "-ex", "target remote %s" % self.serial.port,
- "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second
- self.elf_file], cwd=".")
+ "-ex", "set serial baud %d" % self.serial.baudrate,
+ "-ex", "target remote %s" % self.serial.port,
+ "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second
+ self.elf_file], cwd=".")
process.wait()
except KeyboardInterrupt:
pass # happens on Windows, maybe other OSes
try:
# on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
process.terminate()
- except:
+ except Exception:
pass
try:
# also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
subprocess.call(["stty", "sane"])
- except:
+ except Exception:
pass # don't care if there's no stty, we tried...
self.prompt_next_action("gdb exited")
self._output_enabled = not self._output_enabled
yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled))
+
def main():
parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
monitor.main_loop()
+
if os.name == 'nt':
# Windows console stuff
RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
# list mapping the 8 ANSI colors (the indexes) to Windows Console colors
- ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ]
+ ANSI_TO_WINDOWS_COLOR = [0, 4, 2, 6, 1, 5, 3, 7]
GetStdHandle = ctypes.windll.kernel32.GetStdHandle
SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
data = bytearray(data, 'utf-8')
for b in data:
b = bytes([b])
- l = len(self.matched)
+ length = len(self.matched)
if b == b'\033': # ESC
self.matched = b
- elif (l == 1 and b == b'[') or (1 < l < 7):
+ elif (length == 1 and b == b'[') or (1 < length < 7):
self.matched += b
if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console
# Flush is required only with Python3 - switching color before it is printed would mess up the console
self.flush()
SetConsoleTextAttribute(self.handle, color)
else:
- self._output_write(self.matched) # not an ANSI color code, display verbatim
+ self._output_write(self.matched) # not an ANSI color code, display verbatim
self.matched = b''
else:
self._output_write(b)