]> granicus.if.org Git - esp-idf/blob - tools/idf_monitor.py
Merge branch 'feature/idf-fs-profile-modifications' into 'master'
[esp-idf] / tools / idf_monitor.py
1 #!/usr/bin/env python
2 #
3 # esp-idf serial output monitor tool. Does some helpful things:
4 # - Looks up hex addresses in ELF file with addr2line
5 # - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R)
6 # - Run "make flash" (Ctrl-T Ctrl-F)
7 # - Run "make app-flash" (Ctrl-T Ctrl-A)
8 # - If gdbstub output is detected, gdb is automatically loaded
9 #
10 # Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD
11 #
12 # Licensed under the Apache License, Version 2.0 (the "License");
13 # you may not use this file except in compliance with the License.
14 # You may obtain a copy of the License at
15 #
16 #     http://www.apache.org/licenses/LICENSE-2.0
17 #
18 # Unless required by applicable law or agreed to in writing, software
19 # distributed under the License is distributed on an "AS IS" BASIS,
20 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 # See the License for the specific language governing permissions and
22 # limitations under the License.
23 #
24 # Contains elements taken from miniterm "Very simple serial terminal" which
25 # is part of pySerial. https://github.com/pyserial/pyserial
26 # (C)2002-2015 Chris Liechti <cliechti@gmx.net>
27 #
28 # Originally released under BSD-3-Clause license.
29 #
30 from __future__ import print_function, division
31 import subprocess
32 import argparse
33 import codecs
34 import re
35 import os
36 try:
37     import queue
38 except ImportError:
39     import Queue as queue
40 import time
41 import sys
42 import serial
43 import serial.tools.miniterm as miniterm
44 import threading
45 import ctypes
46 import types
47 from distutils.version import StrictVersion
48
49 key_description = miniterm.key_description
50
51 # Control-key characters
52 CTRL_A = '\x01'
53 CTRL_B = '\x02'
54 CTRL_F = '\x06'
55 CTRL_H = '\x08'
56 CTRL_R = '\x12'
57 CTRL_T = '\x14'
58 CTRL_Y = '\x19'
59 CTRL_P = '\x10'
60 CTRL_RBRACKET = '\x1d'  # Ctrl+]
61
62 # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
63 ANSI_RED = '\033[1;31m'
64 ANSI_YELLOW = '\033[0;33m'
65 ANSI_NORMAL = '\033[0m'
66
67 def color_print(message, color):
68     """ Print a message to stderr with colored highlighting """
69     sys.stderr.write("%s%s%s\n" % (color, message,  ANSI_NORMAL))
70
71 def yellow_print(message):
72     color_print(message, ANSI_YELLOW)
73
74 def red_print(message):
75     color_print(message, ANSI_RED)
76
77 __version__ = "1.1"
78
79 # Tags for tuples in queues
80 TAG_KEY = 0
81 TAG_SERIAL = 1
82 TAG_SERIAL_FLUSH = 2
83
84 # regex matches an potential PC value (0x4xxxxxxx)
85 MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
86
87 DEFAULT_TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
88
89 DEFAULT_PRINT_FILTER = ""
90
91 class StoppableThread(object):
92     """
93     Provide a Thread-like class which can be 'cancelled' via a subclass-provided
94     cancellation method.
95
96     Can be started and stopped multiple times.
97
98     Isn't an instance of type Thread because Python Thread objects can only be run once
99     """
100     def __init__(self):
101         self._thread = None
102
103     @property
104     def alive(self):
105         """
106         Is 'alive' whenever the internal thread object exists
107         """
108         return self._thread is not None
109
110     def start(self):
111         if self._thread is None:
112             self._thread = threading.Thread(target=self._run_outer)
113             self._thread.start()
114
115     def _cancel(self):
116         pass # override to provide cancellation functionality
117
118     def run(self):
119         pass # override for the main thread behaviour
120
121     def _run_outer(self):
122         try:
123             self.run()
124         finally:
125             self._thread = None
126
127     def stop(self):
128         if self._thread is not None:
129             old_thread = self._thread
130             self._thread = None
131             self._cancel()
132             old_thread.join()
133
134 class ConsoleReader(StoppableThread):
135     """ Read input keys from the console and push them to the queue,
136     until stopped.
137     """
138     def __init__(self, console, event_queue, test_mode):
139         super(ConsoleReader, self).__init__()
140         self.console = console
141         self.event_queue = event_queue
142         self.test_mode = test_mode
143
144     def run(self):
145         self.console.setup()
146         try:
147             while self.alive:
148                 try:
149                     if os.name == 'nt':
150                         # Windows kludge: because the console.cancel() method doesn't
151                         # seem to work to unblock getkey() on the Windows implementation.
152                         #
153                         # So we only call getkey() if we know there's a key waiting for us.
154                         import msvcrt
155                         while not msvcrt.kbhit() and self.alive:
156                             time.sleep(0.1)
157                         if not self.alive:
158                             break
159                     elif self.test_mode:
160                         # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
161                         # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
162                         # Therefore, we avoid calling it.
163                         while self.alive:
164                             time.sleep(0.1)
165                         break
166                     c = self.console.getkey()
167                 except KeyboardInterrupt:
168                     c = '\x03'
169                 if c is not None:
170                     self.event_queue.put((TAG_KEY, c), False)
171         finally:
172             self.console.cleanup()
173
174     def _cancel(self):
175         if os.name == 'posix' and not self.test_mode:
176             # this is the way cancel() is implemented in pyserial 3.3 or newer,
177             # older pyserial (3.1+) has cancellation implemented via 'select',
178             # which does not work when console sends an escape sequence response
179             #
180             # even older pyserial (<3.1) does not have this method
181             #
182             # on Windows there is a different (also hacky) fix, applied above.
183             #
184             # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
185             # TODO: introduce some workaround to make it work there.
186             #
187             # Note: This would throw exception in testing mode when the stdin is connected to PTY.
188             import fcntl, termios
189             fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
190
191 class SerialReader(StoppableThread):
192     """ Read serial data from the serial port and push to the
193     event queue, until stopped.
194     """
195     def __init__(self, serial, event_queue):
196         super(SerialReader, self).__init__()
197         self.baud = serial.baudrate
198         self.serial = serial
199         self.event_queue = event_queue
200         if not hasattr(self.serial, 'cancel_read'):
201             # enable timeout for checking alive flag,
202             # if cancel_read not available
203             self.serial.timeout = 0.25
204
205     def run(self):
206         if not self.serial.is_open:
207             self.serial.baudrate = self.baud
208             self.serial.rts = True  # Force an RTS reset on open
209             self.serial.open()
210             self.serial.rts = False
211         try:
212             while self.alive:
213                 data = self.serial.read(self.serial.in_waiting or 1)
214                 if len(data):
215                     self.event_queue.put((TAG_SERIAL, data), False)
216         finally:
217             self.serial.close()
218
219     def _cancel(self):
220         if hasattr(self.serial, 'cancel_read'):
221             try:
222                 self.serial.cancel_read()
223             except:
224                 pass
225
226 class LineMatcher:
227     """
228     Assembles a dictionary of filtering rules based on the --print_filter
229     argument of idf_monitor. Then later it is used to match lines and
230     determine whether they should be shown on screen or not.
231     """
232     LEVEL_N = 0
233     LEVEL_E = 1
234     LEVEL_W = 2
235     LEVEL_I = 3
236     LEVEL_D = 4
237     LEVEL_V = 5
238
239     level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
240             'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
241
242     def __init__(self, print_filter):
243         self._dict = dict()
244         self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
245         items = print_filter.split()
246         if len(items) == 0:
247             self._dict["*"] = self.LEVEL_V # default is to print everything
248         for f in items:
249             s = f.split(r':')
250             if len(s) == 1:
251                 # specifying no warning level defaults to verbose level
252                 lev = self.LEVEL_V
253             elif len(s) == 2:
254                 if len(s[0]) == 0:
255                     raise ValueError('No tag specified in filter ' + f)
256                 try:
257                     lev = self.level[s[1].upper()]
258                 except KeyError:
259                     raise ValueError('Unknown warning level in filter ' + f)
260             else:
261                 raise ValueError('Missing ":" in filter ' + f)
262             self._dict[s[0]] = lev
263     def match(self, line):
264         try:
265             m = self._re.search(line)
266             if m:
267                 lev = self.level[m.group(1)]
268                 if m.group(2) in self._dict:
269                     return self._dict[m.group(2)] >= lev
270                 return self._dict.get("*", self.LEVEL_N) >= lev
271         except (KeyError, IndexError):
272             # Regular line written with something else than ESP_LOG*
273             # or an empty line.
274             pass
275         # We need something more than "*.N" for printing.
276         return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N
277
278 class SerialStopException(Exception):
279     """
280     This exception is used for stopping the IDF monitor in testing mode.
281     """
282     pass
283
284 class Monitor(object):
285     """
286     Monitor application main class.
287
288     This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this
289     purpose.
290
291     Main difference is that all event processing happens in the main thread, not the worker threads.
292     """
293     def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
294         super(Monitor, self).__init__()
295         self.event_queue = queue.Queue()
296         self.console = miniterm.Console()
297         if os.name == 'nt':
298             sys.stderr = ANSIColorConverter(sys.stderr)
299             self.console.output = ANSIColorConverter(self.console.output)
300             self.console.byte_output = ANSIColorConverter(self.console.byte_output)
301
302         if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
303             # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
304             def getkey_patched(self):
305                 c = self.enc_stdin.read(1)
306                 if c == unichr(0x7f):
307                     c = unichr(8)    # map the BS key (which yields DEL) to backspace
308                 return c
309
310             self.console.getkey = types.MethodType(getkey_patched, self.console)
311
312         socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
313         self.serial = serial_instance
314         self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
315         self.serial_reader = SerialReader(self.serial, self.event_queue)
316         self.elf_file = elf_file
317         self.make = make
318         self.toolchain_prefix = toolchain_prefix
319         self.menu_key = CTRL_T
320         self.exit_key = CTRL_RBRACKET
321
322         self.translate_eol = {
323             "CRLF": lambda c: c.replace(b"\n", b"\r\n"),
324             "CR":   lambda c: c.replace(b"\n", b"\r"),
325             "LF":   lambda c: c.replace(b"\r", b"\n"),
326         }[eol]
327
328         # internal state
329         self._pressed_menu_key = False
330         self._last_line_part = b""
331         self._gdb_buffer = b""
332         self._pc_address_buffer = b""
333         self._line_matcher = LineMatcher(print_filter)
334         self._invoke_processing_last_line_timer = None
335         self._force_line_print = False
336         self._output_enabled = True
337         self._serial_check_exit = socket_mode
338
339     def invoke_processing_last_line(self):
340         self.event_queue.put((TAG_SERIAL_FLUSH, b''), False)
341
342     def main_loop(self):
343         self.console_reader.start()
344         self.serial_reader.start()
345         try:
346             while self.console_reader.alive and self.serial_reader.alive:
347                 (event_tag, data) = self.event_queue.get()
348                 if event_tag == TAG_KEY:
349                     self.handle_key(data)
350                 elif event_tag == TAG_SERIAL:
351                     self.handle_serial_input(data)
352                     if self._invoke_processing_last_line_timer is not None:
353                         self._invoke_processing_last_line_timer.cancel()
354                     self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line)
355                     self._invoke_processing_last_line_timer.start()
356                     # If no futher data is received in the next short period
357                     # of time then the _invoke_processing_last_line_timer
358                     # generates an event which will result in the finishing of
359                     # the last line. This is fix for handling lines sent
360                     # without EOL.
361                 elif event_tag == TAG_SERIAL_FLUSH:
362                     self.handle_serial_input(data, finalize_line=True)
363                 else:
364                     raise RuntimeError("Bad event data %r" % ((event_tag,data),))
365         except SerialStopException:
366             pass
367         finally:
368             try:
369                 self.console_reader.stop()
370                 self.serial_reader.stop()
371                 # Cancelling _invoke_processing_last_line_timer is not
372                 # important here because receiving empty data doesn't matter.
373                 self._invoke_processing_last_line_timer = None
374             except:
375                 pass
376             sys.stderr.write(ANSI_NORMAL + "\n")
377
378     def handle_key(self, key):
379         if self._pressed_menu_key:
380             self.handle_menu_key(key)
381             self._pressed_menu_key = False
382         elif key == self.menu_key:
383             self._pressed_menu_key = True
384         elif key == self.exit_key:
385             self.console_reader.stop()
386             self.serial_reader.stop()
387         else:
388             try:
389                 key = self.translate_eol(key)
390                 self.serial.write(codecs.encode(key))
391             except serial.SerialException:
392                 pass # this shouldn't happen, but sometimes port has closed in serial thread
393             except UnicodeEncodeError:
394                 pass # this can happen if a non-ascii character was passed, ignoring
395
396     def handle_serial_input(self, data, finalize_line=False):
397         sp = data.split(b'\n')
398         if self._last_line_part != b"":
399             # add unprocessed part from previous "data" to the first line
400             sp[0] = self._last_line_part + sp[0]
401             self._last_line_part = b""
402         if sp[-1] != b"":
403             # last part is not a full line
404             self._last_line_part = sp.pop()
405         for line in sp:
406             if line != b"":
407                 if self._serial_check_exit and line == self.exit_key:
408                     raise SerialStopException()
409                 if self._output_enabled and (self._force_line_print or self._line_matcher.match(line)):
410                     self.console.write_bytes(line + b'\n')
411                     self.handle_possible_pc_address_in_line(line)
412                 self.check_gdbstub_trigger(line)
413                 self._force_line_print = False
414         # Now we have the last part (incomplete line) in _last_line_part. By
415         # default we don't touch it and just wait for the arrival of the rest
416         # of the line. But after some time when we didn't received it we need
417         # to make a decision.
418         if self._last_line_part != b"":
419             if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part)):
420                 self._force_line_print = True;
421                 if self._output_enabled:
422                     self.console.write_bytes(self._last_line_part)
423                     self.handle_possible_pc_address_in_line(self._last_line_part)
424                 self.check_gdbstub_trigger(self._last_line_part)
425                 # It is possible that the incomplete line cuts in half the PC
426                 # address. A small buffer is kept and will be used the next time
427                 # handle_possible_pc_address_in_line is invoked to avoid this problem.
428                 # MATCH_PCADDR matches 10 character long addresses. Therefore, we
429                 # keep the last 9 characters.
430                 self._pc_address_buffer = self._last_line_part[-9:]
431                 # GDB sequence can be cut in half also. GDB sequence is 7
432                 # characters long, therefore, we save the last 6 characters.
433                 self._gdb_buffer = self._last_line_part[-6:]
434                 self._last_line_part = b""
435         # else: keeping _last_line_part and it will be processed the next time
436         # handle_serial_input is invoked
437
438     def handle_possible_pc_address_in_line(self, line):
439         line = self._pc_address_buffer + line
440         self._pc_address_buffer = b""
441         for m in re.finditer(MATCH_PCADDR, line):
442             self.lookup_pc_address(m.group())
443
444     def handle_menu_key(self, c):
445         if c == self.exit_key or c == self.menu_key:  # send verbatim
446             self.serial.write(codecs.encode(c))
447         elif c in [ CTRL_H, 'h', 'H', '?' ]:
448             red_print(self.get_help_text())
449         elif c == CTRL_R:  # Reset device via RTS
450             self.serial.setRTS(True)
451             time.sleep(0.2)
452             self.serial.setRTS(False)
453             self.output_enable(True)
454         elif c == CTRL_F:  # Recompile & upload
455             self.run_make("flash")
456         elif c == CTRL_A:  # Recompile & upload app only
457             self.run_make("app-flash")
458         elif c == CTRL_Y:  # Toggle output display
459             self.output_toggle()
460         elif c == CTRL_P:
461             yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
462             # to fast trigger pause without press menu key
463             self.serial.setDTR(False)  # IO0=HIGH
464             self.serial.setRTS(True)   # EN=LOW, chip in reset
465             time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
466             self.serial.setDTR(True)   # IO0=LOW
467             self.serial.setRTS(False)  # EN=HIGH, chip out of reset
468             time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
469             self.serial.setDTR(False)  # IO0=HIGH, done
470         else:
471             red_print('--- unknown menu character {} --'.format(key_description(c)))
472
473     def get_help_text(self):
474         return """
475 --- idf_monitor ({version}) - ESP-IDF monitor tool
476 --- based on miniterm from pySerial
477 ---
478 --- {exit:8} Exit program
479 --- {menu:8} Menu escape key, followed by:
480 --- Menu keys:
481 ---    {menu:7} Send the menu character itself to remote
482 ---    {exit:7} Send the exit character itself to remote
483 ---    {reset:7} Reset target board via RTS line
484 ---    {make:7} Run 'make flash' to build & flash
485 ---    {appmake:7} Run 'make app-flash to build & flash app
486 ---    {output:7} Toggle output display
487 ---    {pause:7} Reset target into bootloader to pause app via RTS line
488 """.format(version=__version__,
489            exit=key_description(self.exit_key),
490            menu=key_description(self.menu_key),
491            reset=key_description(CTRL_R),
492            make=key_description(CTRL_F),
493            appmake=key_description(CTRL_A),
494            output=key_description(CTRL_Y),
495            pause=key_description(CTRL_P),
496            )
497
498     def __enter__(self):
499         """ Use 'with self' to temporarily disable monitoring behaviour """
500         self.serial_reader.stop()
501         self.console_reader.stop()
502
503     def __exit__(self, *args, **kwargs):
504         """ Use 'with self' to temporarily disable monitoring behaviour """
505         self.console_reader.start()
506         self.serial_reader.start()
507
508     def prompt_next_action(self, reason):
509         self.console.setup()  # set up console to trap input characters
510         try:
511             red_print("""
512 --- {}
513 --- Press {} to exit monitor.
514 --- Press {} to run 'make flash'.
515 --- Press {} to run 'make app-flash'.
516 --- Press any other key to resume monitor (resets target).""".format(reason,
517                                                                      key_description(self.exit_key),
518                                                                      key_description(CTRL_F),
519                                                                      key_description(CTRL_A)))
520             k = CTRL_T  # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
521             while k == CTRL_T:
522                 k = self.console.getkey()
523         finally:
524             self.console.cleanup()
525         if k == self.exit_key:
526             self.event_queue.put((TAG_KEY, k))
527         elif k in [ CTRL_F, CTRL_A ]:
528             self.event_queue.put((TAG_KEY, self.menu_key))
529             self.event_queue.put((TAG_KEY, k))
530
531     def run_make(self, target):
532         with self:
533             yellow_print("Running make %s..." % target)
534             p = subprocess.Popen([self.make,
535                                   target ])
536             try:
537                 p.wait()
538             except KeyboardInterrupt:
539                 p.wait()
540             if p.returncode != 0:
541                 self.prompt_next_action("Build failed")
542             else:
543                 self.output_enable(True)
544
545     def lookup_pc_address(self, pc_addr):
546         translation = subprocess.check_output(
547             ["%saddr2line" % self.toolchain_prefix,
548              "-pfiaC", "-e", self.elf_file, pc_addr],
549             cwd=".")
550         if not "?? ??:0" in translation:
551             yellow_print(translation)
552
553     def check_gdbstub_trigger(self, line):
554         line = self._gdb_buffer + line
555         self._gdb_buffer = b""
556         m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
557         if m is not None:
558             try:
559                 chsum = sum(ord(p) for p in m.group(1)) & 0xFF
560                 calc_chsum = int(m.group(2), 16)
561             except ValueError:
562                 return  # payload wasn't valid hex digits
563             if chsum == calc_chsum:
564                 self.run_gdb()
565             else:
566                 red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum))
567
568
569     def run_gdb(self):
570         with self:  # disable console control
571             sys.stderr.write(ANSI_NORMAL)
572             try:
573                 process = subprocess.Popen(["%sgdb" % self.toolchain_prefix,
574                                 "-ex", "set serial baud %d" % self.serial.baudrate,
575                                 "-ex", "target remote %s" % self.serial.port,
576                                 "-ex", "interrupt",  # monitor has already parsed the first 'reason' command, need a second
577                                 self.elf_file], cwd=".")
578                 process.wait()
579             except KeyboardInterrupt:
580                 pass  # happens on Windows, maybe other OSes
581             finally:
582                 try:
583                     # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
584                     process.terminate()
585                 except:
586                     pass
587                 try:
588                     # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
589                     subprocess.call(["stty", "sane"])
590                 except:
591                     pass  # don't care if there's no stty, we tried...
592             self.prompt_next_action("gdb exited")
593
594     def output_enable(self, enable):
595         self._output_enabled = enable
596
597     def output_toggle(self):
598         self._output_enabled = not self._output_enabled
599         yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled))
600
601 def main():
602     parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
603
604     parser.add_argument(
605         '--port', '-p',
606         help='Serial port device',
607         default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
608     )
609
610     parser.add_argument(
611         '--baud', '-b',
612         help='Serial port baud rate',
613         type=int,
614         default=os.environ.get('MONITOR_BAUD', 115200))
615
616     parser.add_argument(
617         '--make', '-m',
618         help='Command to run make',
619         type=str, default='make')
620
621     parser.add_argument(
622         '--toolchain-prefix',
623         help="Triplet prefix to add before cross-toolchain names",
624         default=DEFAULT_TOOLCHAIN_PREFIX)
625
626     parser.add_argument(
627         "--eol",
628         choices=['CR', 'LF', 'CRLF'],
629         type=lambda c: c.upper(),
630         help="End of line to use when sending to the serial port",
631         default='CR')
632
633     parser.add_argument(
634         'elf_file', help='ELF file of application',
635         type=argparse.FileType('rb'))
636
637     parser.add_argument(
638         '--print_filter',
639         help="Filtering string",
640         default=DEFAULT_PRINT_FILTER)
641
642     args = parser.parse_args()
643
644     if args.port.startswith("/dev/tty."):
645         args.port = args.port.replace("/dev/tty.", "/dev/cu.")
646         yellow_print("--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.")
647         yellow_print("--- Using %s instead..." % args.port)
648
649     serial_instance = serial.serial_for_url(args.port, args.baud,
650                                             do_not_open=True)
651     serial_instance.dtr = False
652     serial_instance.rts = False
653
654     args.elf_file.close()  # don't need this as a file
655
656     # remove the parallel jobserver arguments from MAKEFLAGS, as any
657     # parent make is only running 1 job (monitor), so we can re-spawn
658     # all of the child makes we need (the -j argument remains part of
659     # MAKEFLAGS)
660     try:
661         makeflags = os.environ["MAKEFLAGS"]
662         makeflags = re.sub(r"--jobserver[^ =]*=[0-9,]+ ?", "", makeflags)
663         os.environ["MAKEFLAGS"] = makeflags
664     except KeyError:
665         pass  # not running a make jobserver
666
667     monitor = Monitor(serial_instance, args.elf_file.name, args.print_filter, args.make, args.toolchain_prefix, args.eol)
668
669     yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
670         p=serial_instance))
671     yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
672         key_description(monitor.exit_key),
673         key_description(monitor.menu_key),
674         key_description(monitor.menu_key),
675         key_description(CTRL_H)))
676     if args.print_filter != DEFAULT_PRINT_FILTER:
677         yellow_print('--- Print filter: {} ---'.format(args.print_filter))
678
679     monitor.main_loop()
680
681 if os.name == 'nt':
682     # Windows console stuff
683
684     STD_OUTPUT_HANDLE = -11
685     STD_ERROR_HANDLE = -12
686
687     # wincon.h values
688     FOREGROUND_INTENSITY = 8
689     FOREGROUND_GREY = 7
690
691     # matches the ANSI color change sequences that IDF sends
692     RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
693
694     # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
695     ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ]
696
697     GetStdHandle = ctypes.windll.kernel32.GetStdHandle
698     SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
699
700     class ANSIColorConverter(object):
701         """Class to wrap a file-like output stream, intercept ANSI color codes,
702         and convert them into calls to Windows SetConsoleTextAttribute.
703
704         Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses.
705
706         Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text
707         color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the
708         least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
709         """
710
711         def __init__(self, output):
712             self.output = output
713             self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
714             self.matched = b''
715
716         def _output_write(self, data):
717             try:
718                 self.output.write(data)
719             except IOError:
720                 # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
721                 # an exception (however, the character is still written to the screen)
722                 # Ref https://github.com/espressif/esp-idf/issues/1136
723                 pass
724
725         def write(self, data):
726             for b in data:
727                 l = len(self.matched)
728                 if b == '\033':  # ESC
729                     self.matched = b
730                 elif (l == 1 and b == '[') or (1 < l < 7):
731                     self.matched += b
732                     if self.matched == ANSI_NORMAL:  # reset console
733                         SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
734                         self.matched = b''
735                     elif len(self.matched) == 7:     # could be an ANSI sequence
736                         m = re.match(RE_ANSI_COLOR, self.matched)
737                         if m is not None:
738                             color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
739                             if m.group(1) == b'1':
740                                 color |= FOREGROUND_INTENSITY
741                             SetConsoleTextAttribute(self.handle, color)
742                         else:
743                             self._output_write(self.matched) # not an ANSI color code, display verbatim
744                         self.matched = b''
745                 else:
746                     self._output_write(b)
747                     self.matched = b''
748
749         def flush(self):
750             self.output.flush()
751
752
753 if __name__ == "__main__":
754     main()