]> granicus.if.org Git - esp-idf/blob - tools/idf_monitor.py
Component/bt: read multiple return callback status: ESP_GATT_STACK_RSP
[esp-idf] / tools / idf_monitor.py
1 #!/usr/bin/env python
2 #
3 # esp-idf serial output monitor tool. Does some helpful things:
4 # - Looks up hex addresses in ELF file with addr2line
5 # - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R)
6 # - Run "make (or idf.py) flash" (Ctrl-T Ctrl-F)
7 # - Run "make (or idf.py) app-flash" (Ctrl-T Ctrl-A)
8 # - If gdbstub output is detected, gdb is automatically loaded
9 #
10 # Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD
11 #
12 # Licensed under the Apache License, Version 2.0 (the "License");
13 # you may not use this file except in compliance with the License.
14 # You may obtain a copy of the License at
15 #
16 #     http://www.apache.org/licenses/LICENSE-2.0
17 #
18 # Unless required by applicable law or agreed to in writing, software
19 # distributed under the License is distributed on an "AS IS" BASIS,
20 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 # See the License for the specific language governing permissions and
22 # limitations under the License.
23 #
24 # Contains elements taken from miniterm "Very simple serial terminal" which
25 # is part of pySerial. https://github.com/pyserial/pyserial
26 # (C)2002-2015 Chris Liechti <cliechti@gmx.net>
27 #
28 # Originally released under BSD-3-Clause license.
29 #
30 from __future__ import print_function, division
31 from __future__ import unicode_literals
32 from future import standard_library
33 standard_library.install_aliases()
34 from builtins import chr
35 from builtins import object
36 from builtins import bytes
37 import subprocess
38 import argparse
39 import codecs
40 import re
41 import os
42 try:
43     import queue
44 except ImportError:
45     import Queue as queue
46 import shlex
47 import time
48 import sys
49 import serial
50 import serial.tools.miniterm as miniterm
51 import threading
52 import ctypes
53 import types
54 from distutils.version import StrictVersion
55
56 key_description = miniterm.key_description
57
58 # Control-key characters
59 CTRL_A = '\x01'
60 CTRL_B = '\x02'
61 CTRL_F = '\x06'
62 CTRL_H = '\x08'
63 CTRL_R = '\x12'
64 CTRL_T = '\x14'
65 CTRL_Y = '\x19'
66 CTRL_P = '\x10'
67 CTRL_RBRACKET = '\x1d'  # Ctrl+]
68
69 # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
70 ANSI_RED = '\033[1;31m'
71 ANSI_YELLOW = '\033[0;33m'
72 ANSI_NORMAL = '\033[0m'
73
74 def color_print(message, color):
75     """ Print a message to stderr with colored highlighting """
76     sys.stderr.write("%s%s%s\n" % (color, message,  ANSI_NORMAL))
77
78 def yellow_print(message):
79     color_print(message, ANSI_YELLOW)
80
81 def red_print(message):
82     color_print(message, ANSI_RED)
83
84 __version__ = "1.1"
85
86 # Tags for tuples in queues
87 TAG_KEY = 0
88 TAG_SERIAL = 1
89 TAG_SERIAL_FLUSH = 2
90
91 # regex matches an potential PC value (0x4xxxxxxx)
92 MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
93
94 DEFAULT_TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
95
96 DEFAULT_PRINT_FILTER = ""
97
98 class StoppableThread(object):
99     """
100     Provide a Thread-like class which can be 'cancelled' via a subclass-provided
101     cancellation method.
102
103     Can be started and stopped multiple times.
104
105     Isn't an instance of type Thread because Python Thread objects can only be run once
106     """
107     def __init__(self):
108         self._thread = None
109
110     @property
111     def alive(self):
112         """
113         Is 'alive' whenever the internal thread object exists
114         """
115         return self._thread is not None
116
117     def start(self):
118         if self._thread is None:
119             self._thread = threading.Thread(target=self._run_outer)
120             self._thread.start()
121
122     def _cancel(self):
123         pass # override to provide cancellation functionality
124
125     def run(self):
126         pass # override for the main thread behaviour
127
128     def _run_outer(self):
129         try:
130             self.run()
131         finally:
132             self._thread = None
133
134     def stop(self):
135         if self._thread is not None:
136             old_thread = self._thread
137             self._thread = None
138             self._cancel()
139             old_thread.join()
140
141 class ConsoleReader(StoppableThread):
142     """ Read input keys from the console and push them to the queue,
143     until stopped.
144     """
145     def __init__(self, console, event_queue, test_mode):
146         super(ConsoleReader, self).__init__()
147         self.console = console
148         self.event_queue = event_queue
149         self.test_mode = test_mode
150
151     def run(self):
152         self.console.setup()
153         try:
154             while self.alive:
155                 try:
156                     if os.name == 'nt':
157                         # Windows kludge: because the console.cancel() method doesn't
158                         # seem to work to unblock getkey() on the Windows implementation.
159                         #
160                         # So we only call getkey() if we know there's a key waiting for us.
161                         import msvcrt
162                         while not msvcrt.kbhit() and self.alive:
163                             time.sleep(0.1)
164                         if not self.alive:
165                             break
166                     elif self.test_mode:
167                         # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
168                         # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
169                         # Therefore, we avoid calling it.
170                         while self.alive:
171                             time.sleep(0.1)
172                         break
173                     c = self.console.getkey()
174                 except KeyboardInterrupt:
175                     c = '\x03'
176                 if c is not None:
177                     self.event_queue.put((TAG_KEY, c), False)
178         finally:
179             self.console.cleanup()
180
181     def _cancel(self):
182         if os.name == 'posix' and not self.test_mode:
183             # this is the way cancel() is implemented in pyserial 3.3 or newer,
184             # older pyserial (3.1+) has cancellation implemented via 'select',
185             # which does not work when console sends an escape sequence response
186             #
187             # even older pyserial (<3.1) does not have this method
188             #
189             # on Windows there is a different (also hacky) fix, applied above.
190             #
191             # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
192             # TODO: introduce some workaround to make it work there.
193             #
194             # Note: This would throw exception in testing mode when the stdin is connected to PTY.
195             import fcntl, termios
196             fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
197
198 class SerialReader(StoppableThread):
199     """ Read serial data from the serial port and push to the
200     event queue, until stopped.
201     """
202     def __init__(self, serial, event_queue):
203         super(SerialReader, self).__init__()
204         self.baud = serial.baudrate
205         self.serial = serial
206         self.event_queue = event_queue
207         if not hasattr(self.serial, 'cancel_read'):
208             # enable timeout for checking alive flag,
209             # if cancel_read not available
210             self.serial.timeout = 0.25
211
212     def run(self):
213         if not self.serial.is_open:
214             self.serial.baudrate = self.baud
215             self.serial.rts = True  # Force an RTS reset on open
216             self.serial.open()
217             self.serial.rts = False
218         try:
219             while self.alive:
220                 data = self.serial.read(self.serial.in_waiting or 1)
221                 if len(data):
222                     self.event_queue.put((TAG_SERIAL, data), False)
223         finally:
224             self.serial.close()
225
226     def _cancel(self):
227         if hasattr(self.serial, 'cancel_read'):
228             try:
229                 self.serial.cancel_read()
230             except:
231                 pass
232
233 class LineMatcher(object):
234     """
235     Assembles a dictionary of filtering rules based on the --print_filter
236     argument of idf_monitor. Then later it is used to match lines and
237     determine whether they should be shown on screen or not.
238     """
239     LEVEL_N = 0
240     LEVEL_E = 1
241     LEVEL_W = 2
242     LEVEL_I = 3
243     LEVEL_D = 4
244     LEVEL_V = 5
245
246     level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
247             'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
248
249     def __init__(self, print_filter):
250         self._dict = dict()
251         self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
252         items = print_filter.split()
253         if len(items) == 0:
254             self._dict["*"] = self.LEVEL_V # default is to print everything
255         for f in items:
256             s = f.split(r':')
257             if len(s) == 1:
258                 # specifying no warning level defaults to verbose level
259                 lev = self.LEVEL_V
260             elif len(s) == 2:
261                 if len(s[0]) == 0:
262                     raise ValueError('No tag specified in filter ' + f)
263                 try:
264                     lev = self.level[s[1].upper()]
265                 except KeyError:
266                     raise ValueError('Unknown warning level in filter ' + f)
267             else:
268                 raise ValueError('Missing ":" in filter ' + f)
269             self._dict[s[0]] = lev
270     def match(self, line):
271         try:
272             m = self._re.search(line)
273             if m:
274                 lev = self.level[m.group(1)]
275                 if m.group(2) in self._dict:
276                     return self._dict[m.group(2)] >= lev
277                 return self._dict.get("*", self.LEVEL_N) >= lev
278         except (KeyError, IndexError):
279             # Regular line written with something else than ESP_LOG*
280             # or an empty line.
281             pass
282         # We need something more than "*.N" for printing.
283         return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N
284
285 class SerialStopException(Exception):
286     """
287     This exception is used for stopping the IDF monitor in testing mode.
288     """
289     pass
290
291 class Monitor(object):
292     """
293     Monitor application main class.
294
295     This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this
296     purpose.
297
298     Main difference is that all event processing happens in the main thread, not the worker threads.
299     """
300     def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
301         super(Monitor, self).__init__()
302         self.event_queue = queue.Queue()
303         self.console = miniterm.Console()
304         if os.name == 'nt':
305             sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
306             self.console.output = ANSIColorConverter(self.console.output)
307             self.console.byte_output = ANSIColorConverter(self.console.byte_output)
308
309         if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
310             # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
311             def getkey_patched(self):
312                 c = self.enc_stdin.read(1)
313                 if c == chr(0x7f):
314                     c = chr(8)    # map the BS key (which yields DEL) to backspace
315                 return c
316
317             self.console.getkey = types.MethodType(getkey_patched, self.console)
318
319         socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
320         self.serial = serial_instance
321         self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
322         self.serial_reader = SerialReader(self.serial, self.event_queue)
323         self.elf_file = elf_file
324         if not os.path.exists(make):
325             self.make = shlex.split(make)  # allow for possibility the "make" arg is a list of arguments (for idf.py)
326         else:
327             self.make = make
328         self.toolchain_prefix = toolchain_prefix
329         self.menu_key = CTRL_T
330         self.exit_key = CTRL_RBRACKET
331
332         self.translate_eol = {
333             "CRLF": lambda c: c.replace("\n", "\r\n"),
334             "CR":   lambda c: c.replace("\n", "\r"),
335             "LF":   lambda c: c.replace("\r", "\n"),
336         }[eol]
337
338         # internal state
339         self._pressed_menu_key = False
340         self._last_line_part = b""
341         self._gdb_buffer = b""
342         self._pc_address_buffer = b""
343         self._line_matcher = LineMatcher(print_filter)
344         self._invoke_processing_last_line_timer = None
345         self._force_line_print = False
346         self._output_enabled = True
347         self._serial_check_exit = socket_mode
348
349     def invoke_processing_last_line(self):
350         self.event_queue.put((TAG_SERIAL_FLUSH, b''), False)
351
352     def main_loop(self):
353         self.console_reader.start()
354         self.serial_reader.start()
355         try:
356             while self.console_reader.alive and self.serial_reader.alive:
357                 (event_tag, data) = self.event_queue.get()
358                 if event_tag == TAG_KEY:
359                     self.handle_key(data)
360                 elif event_tag == TAG_SERIAL:
361                     self.handle_serial_input(data)
362                     if self._invoke_processing_last_line_timer is not None:
363                         self._invoke_processing_last_line_timer.cancel()
364                     self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line)
365                     self._invoke_processing_last_line_timer.start()
366                     # If no futher data is received in the next short period
367                     # of time then the _invoke_processing_last_line_timer
368                     # generates an event which will result in the finishing of
369                     # the last line. This is fix for handling lines sent
370                     # without EOL.
371                 elif event_tag == TAG_SERIAL_FLUSH:
372                     self.handle_serial_input(data, finalize_line=True)
373                 else:
374                     raise RuntimeError("Bad event data %r" % ((event_tag,data),))
375         except SerialStopException:
376             pass
377         finally:
378             try:
379                 self.console_reader.stop()
380                 self.serial_reader.stop()
381                 # Cancelling _invoke_processing_last_line_timer is not
382                 # important here because receiving empty data doesn't matter.
383                 self._invoke_processing_last_line_timer = None
384             except:
385                 pass
386             sys.stderr.write(ANSI_NORMAL + "\n")
387
388     def handle_key(self, key):
389         if self._pressed_menu_key:
390             self.handle_menu_key(key)
391             self._pressed_menu_key = False
392         elif key == self.menu_key:
393             self._pressed_menu_key = True
394         elif key == self.exit_key:
395             self.console_reader.stop()
396             self.serial_reader.stop()
397         else:
398             try:
399                 key = self.translate_eol(key)
400                 self.serial.write(codecs.encode(key))
401             except serial.SerialException:
402                 pass # this shouldn't happen, but sometimes port has closed in serial thread
403             except UnicodeEncodeError:
404                 pass # this can happen if a non-ascii character was passed, ignoring
405
406     def handle_serial_input(self, data, finalize_line=False):
407         sp = data.split(b'\n')
408         if self._last_line_part != b"":
409             # add unprocessed part from previous "data" to the first line
410             sp[0] = self._last_line_part + sp[0]
411             self._last_line_part = b""
412         if sp[-1] != b"":
413             # last part is not a full line
414             self._last_line_part = sp.pop()
415         for line in sp:
416             if line != b"":
417                 if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
418                     raise SerialStopException()
419                 if self._output_enabled and (self._force_line_print or self._line_matcher.match(line.decode(errors="ignore"))):
420                     self.console.write_bytes(line + b'\n')
421                     self.handle_possible_pc_address_in_line(line)
422                 self.check_gdbstub_trigger(line)
423                 self._force_line_print = False
424         # Now we have the last part (incomplete line) in _last_line_part. By
425         # default we don't touch it and just wait for the arrival of the rest
426         # of the line. But after some time when we didn't received it we need
427         # to make a decision.
428         if self._last_line_part != b"":
429             if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))):
430                 self._force_line_print = True;
431                 if self._output_enabled:
432                     self.console.write_bytes(self._last_line_part)
433                     self.handle_possible_pc_address_in_line(self._last_line_part)
434                 self.check_gdbstub_trigger(self._last_line_part)
435                 # It is possible that the incomplete line cuts in half the PC
436                 # address. A small buffer is kept and will be used the next time
437                 # handle_possible_pc_address_in_line is invoked to avoid this problem.
438                 # MATCH_PCADDR matches 10 character long addresses. Therefore, we
439                 # keep the last 9 characters.
440                 self._pc_address_buffer = self._last_line_part[-9:]
441                 # GDB sequence can be cut in half also. GDB sequence is 7
442                 # characters long, therefore, we save the last 6 characters.
443                 self._gdb_buffer = self._last_line_part[-6:]
444                 self._last_line_part = b""
445         # else: keeping _last_line_part and it will be processed the next time
446         # handle_serial_input is invoked
447
448     def handle_possible_pc_address_in_line(self, line):
449         line = self._pc_address_buffer + line
450         self._pc_address_buffer = b""
451         for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
452             self.lookup_pc_address(m.group())
453
454     def handle_menu_key(self, c):
455         if c == self.exit_key or c == self.menu_key:  # send verbatim
456             self.serial.write(codecs.encode(c))
457         elif c in [ CTRL_H, 'h', 'H', '?' ]:
458             red_print(self.get_help_text())
459         elif c == CTRL_R:  # Reset device via RTS
460             self.serial.setRTS(True)
461             time.sleep(0.2)
462             self.serial.setRTS(False)
463             self.output_enable(True)
464         elif c == CTRL_F:  # Recompile & upload
465             self.run_make("flash")
466         elif c == CTRL_A:  # Recompile & upload app only
467             self.run_make("app-flash")
468         elif c == CTRL_Y:  # Toggle output display
469             self.output_toggle()
470         elif c == CTRL_P:
471             yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
472             # to fast trigger pause without press menu key
473             self.serial.setDTR(False)  # IO0=HIGH
474             self.serial.setRTS(True)   # EN=LOW, chip in reset
475             time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
476             self.serial.setDTR(True)   # IO0=LOW
477             self.serial.setRTS(False)  # EN=HIGH, chip out of reset
478             time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
479             self.serial.setDTR(False)  # IO0=HIGH, done
480         else:
481             red_print('--- unknown menu character {} --'.format(key_description(c)))
482
483     def get_help_text(self):
484         return """
485 --- idf_monitor ({version}) - ESP-IDF monitor tool
486 --- based on miniterm from pySerial
487 ---
488 --- {exit:8} Exit program
489 --- {menu:8} Menu escape key, followed by:
490 --- Menu keys:
491 ---    {menu:7} Send the menu character itself to remote
492 ---    {exit:7} Send the exit character itself to remote
493 ---    {reset:7} Reset target board via RTS line
494 ---    {makecmd:7} Build & flash project
495 ---    {appmake:7} Build & flash app only
496 ---    {output:7} Toggle output display
497 ---    {pause:7} Reset target into bootloader to pause app via RTS line
498 """.format(version=__version__,
499            exit=key_description(self.exit_key),
500            menu=key_description(self.menu_key),
501            reset=key_description(CTRL_R),
502            makecmd=key_description(CTRL_F),
503            appmake=key_description(CTRL_A),
504            output=key_description(CTRL_Y),
505            pause=key_description(CTRL_P) )
506
507     def __enter__(self):
508         """ Use 'with self' to temporarily disable monitoring behaviour """
509         self.serial_reader.stop()
510         self.console_reader.stop()
511
512     def __exit__(self, *args, **kwargs):
513         """ Use 'with self' to temporarily disable monitoring behaviour """
514         self.console_reader.start()
515         self.serial_reader.start()
516
517     def prompt_next_action(self, reason):
518         self.console.setup()  # set up console to trap input characters
519         try:
520             red_print("""
521 --- {}
522 --- Press {} to exit monitor.
523 --- Press {} to build & flash project.
524 --- Press {} to build & flash app.
525 --- Press any other key to resume monitor (resets target).""".format(reason,
526                                                                      key_description(self.exit_key),
527                                                                      key_description(CTRL_F),
528                                                                      key_description(CTRL_A) ))
529             k = CTRL_T  # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
530             while k == CTRL_T:
531                 k = self.console.getkey()
532         finally:
533             self.console.cleanup()
534         if k == self.exit_key:
535             self.event_queue.put((TAG_KEY, k))
536         elif k in [ CTRL_F, CTRL_A ]:
537             self.event_queue.put((TAG_KEY, self.menu_key))
538             self.event_queue.put((TAG_KEY, k))
539
540     def run_make(self, target):
541         with self:
542             if isinstance(self.make, list):
543                 popen_args = self.make + [ target ]
544             else:
545                 popen_args = [ self.make, target ]
546             yellow_print("Running %s..." % " ".join(popen_args))
547             p = subprocess.Popen(popen_args)
548             try:
549                 p.wait()
550             except KeyboardInterrupt:
551                 p.wait()
552             if p.returncode != 0:
553                 self.prompt_next_action("Build failed")
554             else:
555                 self.output_enable(True)
556
557     def lookup_pc_address(self, pc_addr):
558         translation = subprocess.check_output(
559             ["%saddr2line" % self.toolchain_prefix,
560              "-pfiaC", "-e", self.elf_file, pc_addr],
561             cwd=".")
562         if not b"?? ??:0" in translation:
563             yellow_print(translation.decode())
564
565     def check_gdbstub_trigger(self, line):
566         line = self._gdb_buffer + line
567         self._gdb_buffer = b""
568         m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
569         if m is not None:
570             try:
571                 chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
572                 calc_chsum = int(m.group(2), 16)
573             except ValueError:
574                 return  # payload wasn't valid hex digits
575             if chsum == calc_chsum:
576                 self.run_gdb()
577             else:
578                 red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum))
579
580
581     def run_gdb(self):
582         with self:  # disable console control
583             sys.stderr.write(ANSI_NORMAL)
584             try:
585                 process = subprocess.Popen(["%sgdb" % self.toolchain_prefix,
586                                 "-ex", "set serial baud %d" % self.serial.baudrate,
587                                 "-ex", "target remote %s" % self.serial.port,
588                                 "-ex", "interrupt",  # monitor has already parsed the first 'reason' command, need a second
589                                 self.elf_file], cwd=".")
590                 process.wait()
591             except KeyboardInterrupt:
592                 pass  # happens on Windows, maybe other OSes
593             finally:
594                 try:
595                     # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
596                     process.terminate()
597                 except:
598                     pass
599                 try:
600                     # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
601                     subprocess.call(["stty", "sane"])
602                 except:
603                     pass  # don't care if there's no stty, we tried...
604             self.prompt_next_action("gdb exited")
605
606     def output_enable(self, enable):
607         self._output_enabled = enable
608
609     def output_toggle(self):
610         self._output_enabled = not self._output_enabled
611         yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled))
612
613 def main():
614     parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
615
616     parser.add_argument(
617         '--port', '-p',
618         help='Serial port device',
619         default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
620     )
621
622     parser.add_argument(
623         '--baud', '-b',
624         help='Serial port baud rate',
625         type=int,
626         default=os.environ.get('MONITOR_BAUD', 115200))
627
628     parser.add_argument(
629         '--make', '-m',
630         help='Command to run make',
631         type=str, default='make')
632
633     parser.add_argument(
634         '--toolchain-prefix',
635         help="Triplet prefix to add before cross-toolchain names",
636         default=DEFAULT_TOOLCHAIN_PREFIX)
637
638     parser.add_argument(
639         "--eol",
640         choices=['CR', 'LF', 'CRLF'],
641         type=lambda c: c.upper(),
642         help="End of line to use when sending to the serial port",
643         default='CR')
644
645     parser.add_argument(
646         'elf_file', help='ELF file of application',
647         type=argparse.FileType('rb'))
648
649     parser.add_argument(
650         '--print_filter',
651         help="Filtering string",
652         default=DEFAULT_PRINT_FILTER)
653
654     args = parser.parse_args()
655
656     if args.port.startswith("/dev/tty."):
657         args.port = args.port.replace("/dev/tty.", "/dev/cu.")
658         yellow_print("--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.")
659         yellow_print("--- Using %s instead..." % args.port)
660
661     serial_instance = serial.serial_for_url(args.port, args.baud,
662                                             do_not_open=True)
663     serial_instance.dtr = False
664     serial_instance.rts = False
665
666     args.elf_file.close()  # don't need this as a file
667
668     # remove the parallel jobserver arguments from MAKEFLAGS, as any
669     # parent make is only running 1 job (monitor), so we can re-spawn
670     # all of the child makes we need (the -j argument remains part of
671     # MAKEFLAGS)
672     try:
673         makeflags = os.environ["MAKEFLAGS"]
674         makeflags = re.sub(r"--jobserver[^ =]*=[0-9,]+ ?", "", makeflags)
675         os.environ["MAKEFLAGS"] = makeflags
676     except KeyError:
677         pass  # not running a make jobserver
678
679     monitor = Monitor(serial_instance, args.elf_file.name, args.print_filter, args.make, args.toolchain_prefix, args.eol)
680
681     yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
682         p=serial_instance))
683     yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
684         key_description(monitor.exit_key),
685         key_description(monitor.menu_key),
686         key_description(monitor.menu_key),
687         key_description(CTRL_H)))
688     if args.print_filter != DEFAULT_PRINT_FILTER:
689         yellow_print('--- Print filter: {} ---'.format(args.print_filter))
690
691     monitor.main_loop()
692
693 if os.name == 'nt':
694     # Windows console stuff
695
696     STD_OUTPUT_HANDLE = -11
697     STD_ERROR_HANDLE = -12
698
699     # wincon.h values
700     FOREGROUND_INTENSITY = 8
701     FOREGROUND_GREY = 7
702
703     # matches the ANSI color change sequences that IDF sends
704     RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
705
706     # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
707     ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ]
708
709     GetStdHandle = ctypes.windll.kernel32.GetStdHandle
710     SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
711
712     class ANSIColorConverter(object):
713         """Class to wrap a file-like output stream, intercept ANSI color codes,
714         and convert them into calls to Windows SetConsoleTextAttribute.
715
716         Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses.
717
718         Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text
719         color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the
720         least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
721         """
722
723         def __init__(self, output=None, decode_output=False):
724             self.output = output
725             self.decode_output = decode_output
726             self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
727             self.matched = b''
728
729         def _output_write(self, data):
730             try:
731                 if self.decode_output:
732                     self.output.write(data.decode())
733                 else:
734                     self.output.write(data)
735             except IOError:
736                 # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
737                 # an exception (however, the character is still written to the screen)
738                 # Ref https://github.com/espressif/esp-idf/issues/1136
739                 pass
740
741         def write(self, data):
742             if isinstance(data, bytes):
743                 data = bytearray(data)
744             else:
745                 data = bytearray(data, 'utf-8')
746             for b in data:
747                 b = bytes([b])
748                 l = len(self.matched)
749                 if b == b'\033':  # ESC
750                     self.matched = b
751                 elif (l == 1 and b == b'[') or (1 < l < 7):
752                     self.matched += b
753                     if self.matched == ANSI_NORMAL.encode('latin-1'):  # reset console
754                         # Flush is required only with Python3 - switching color before it is printed would mess up the console
755                         self.flush()
756                         SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
757                         self.matched = b''
758                     elif len(self.matched) == 7:     # could be an ANSI sequence
759                         m = re.match(RE_ANSI_COLOR, self.matched)
760                         if m is not None:
761                             color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
762                             if m.group(1) == b'1':
763                                 color |= FOREGROUND_INTENSITY
764                             # Flush is required only with Python3 - switching color before it is printed would mess up the console
765                             self.flush()
766                             SetConsoleTextAttribute(self.handle, color)
767                         else:
768                             self._output_write(self.matched) # not an ANSI color code, display verbatim
769                         self.matched = b''
770                 else:
771                     self._output_write(b)
772                     self.matched = b''
773
774         def flush(self):
775             self.output.flush()
776
777 if __name__ == "__main__":
778     main()