Skip to content

Commit

Permalink
tools: Support Python3 in idf_monitor
Browse files Browse the repository at this point in the history
Closes #1284
  • Loading branch information
dobairoland committed Aug 24, 2018
1 parent c7f4fae commit 7118f47
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,12 @@ test_idf_monitor:
expire_in: 1 week
script:
- cd ${IDF_PATH}/tools/test_idf_monitor
- source /opt/pyenv/activate
- pyenv global 2.7.15
- ./run_test_idf_monitor.py
- pyenv global 3.4.8
- ./run_test_idf_monitor.py
- pyenv global system

test_esp_err_to_name_on_host:
<<: *host_test_template
Expand Down
56 changes: 36 additions & 20 deletions tools/idf_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
# Originally released under BSD-3-Clause license.
#
from __future__ import print_function, division
from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import chr
from builtins import object
from builtins import bytes
import subprocess
import argparse
import codecs
Expand Down Expand Up @@ -223,7 +229,7 @@ def _cancel(self):
except:
pass

class LineMatcher:
class LineMatcher(object):
"""
Assembles a dictionary of filtering rules based on the --print_filter
argument of idf_monitor. Then later it is used to match lines and
Expand Down Expand Up @@ -295,16 +301,16 @@ def __init__(self, serial_instance, elf_file, print_filter, make="make", toolcha
self.event_queue = queue.Queue()
self.console = miniterm.Console()
if os.name == 'nt':
sys.stderr = ANSIColorConverter(sys.stderr)
sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
self.console.output = ANSIColorConverter(self.console.output)
self.console.byte_output = ANSIColorConverter(self.console.byte_output)

if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
# Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
def getkey_patched(self):
c = self.enc_stdin.read(1)
if c == unichr(0x7f):
c = unichr(8) # map the BS key (which yields DEL) to backspace
if c == chr(0x7f):
c = chr(8) # map the BS key (which yields DEL) to backspace
return c

self.console.getkey = types.MethodType(getkey_patched, self.console)
Expand All @@ -320,9 +326,9 @@ def getkey_patched(self):
self.exit_key = CTRL_RBRACKET

self.translate_eol = {
"CRLF": lambda c: c.replace(b"\n", b"\r\n"),
"CR": lambda c: c.replace(b"\n", b"\r"),
"LF": lambda c: c.replace(b"\r", b"\n"),
"CRLF": lambda c: c.replace("\n", "\r\n"),
"CR": lambda c: c.replace("\n", "\r"),
"LF": lambda c: c.replace("\r", "\n"),
}[eol]

# internal state
Expand Down Expand Up @@ -404,9 +410,9 @@ def handle_serial_input(self, data, finalize_line=False):
self._last_line_part = sp.pop()
for line in sp:
if line != b"":
if self._serial_check_exit and line == self.exit_key:
if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
raise SerialStopException()
if self._output_enabled and (self._force_line_print or self._line_matcher.match(line)):
if self._output_enabled and (self._force_line_print or self._line_matcher.match(line.decode(errors="ignore"))):
self.console.write_bytes(line + b'\n')
self.handle_possible_pc_address_in_line(line)
self.check_gdbstub_trigger(line)
Expand All @@ -416,7 +422,7 @@ def handle_serial_input(self, data, finalize_line=False):
# of the line. But after some time when we didn't received it we need
# to make a decision.
if self._last_line_part != b"":
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part)):
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))):
self._force_line_print = True;
if self._output_enabled:
self.console.write_bytes(self._last_line_part)
Expand All @@ -438,7 +444,7 @@ def handle_serial_input(self, data, finalize_line=False):
def handle_possible_pc_address_in_line(self, line):
line = self._pc_address_buffer + line
self._pc_address_buffer = b""
for m in re.finditer(MATCH_PCADDR, line):
for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
self.lookup_pc_address(m.group())

def handle_menu_key(self, c):
Expand Down Expand Up @@ -547,16 +553,16 @@ def lookup_pc_address(self, pc_addr):
["%saddr2line" % self.toolchain_prefix,
"-pfiaC", "-e", self.elf_file, pc_addr],
cwd=".")
if not "?? ??:0" in translation:
yellow_print(translation)
if not b"?? ??:0" in translation:
yellow_print(translation.decode())

def check_gdbstub_trigger(self, line):
line = self._gdb_buffer + line
self._gdb_buffer = b""
m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
if m is not None:
try:
chsum = sum(ord(p) for p in m.group(1)) & 0xFF
chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
calc_chsum = int(m.group(2), 16)
except ValueError:
return # payload wasn't valid hex digits
Expand Down Expand Up @@ -708,28 +714,37 @@ class ANSIColorConverter(object):
least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
"""

def __init__(self, output):
def __init__(self, output=None, decode_output=False):
self.output = output
self.decode_output = decode_output
self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
self.matched = b''

def _output_write(self, data):
try:
self.output.write(data)
if self.decode_output:
self.output.write(data.decode())
else:
self.output.write(data)
except IOError:
# Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
# an exception (however, the character is still written to the screen)
# Ref https://github.com/espressif/esp-idf/issues/1136
pass

def write(self, data):
if type(data) is not bytes:
data = data.encode('latin-1')
for b in data:
b = bytes([b])
l = len(self.matched)
if b == '\033': # ESC
if b == b'\033': # ESC
self.matched = b
elif (l == 1 and b == '[') or (1 < l < 7):
elif (l == 1 and b == b'[') or (1 < l < 7):
self.matched += b
if self.matched == ANSI_NORMAL: # reset console
if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console
# Flush is required only with Python3 - switching color before it is printed would mess up the console
self.flush()
SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
self.matched = b''
elif len(self.matched) == 7: # could be an ANSI sequence
Expand All @@ -738,6 +753,8 @@ def write(self, data):
color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
if m.group(1) == b'1':
color |= FOREGROUND_INTENSITY
# Flush is required only with Python3 - switching color before it is printed would mess up the console
self.flush()
SetConsoleTextAttribute(self.handle, color)
else:
self._output_write(self.matched) # not an ANSI color code, display verbatim
Expand All @@ -749,6 +766,5 @@ def write(self, data):
def flush(self):
self.output.flush()


if __name__ == "__main__":
main()
38 changes: 21 additions & 17 deletions tools/test_idf_monitor/run_test_idf_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
from __future__ import unicode_literals
from builtins import object
from io import open
import os
import signal
import time
Expand All @@ -34,11 +38,11 @@
in_dir = 'tests/' # tests are in this directory
out_dir = 'outputs/' # test results are written to this directory (kept only for debugging purposes)
socat_in = './socatfile'# temporary socat file (deleted after run)
monitor_error_output = out_dir + 'monitor_error_output'
err_out = out_dir + 'monitor_error_output'
elf_file = './dummy.elf' # ELF file used for starting the monitor
idf_monitor = '{}/tools/idf_monitor.py'.format(os.getenv("IDF_PATH"))

class SocatRunner:
class SocatRunner(object):
"""
Runs socat in the background for creating a socket.
"""
Expand All @@ -54,7 +58,7 @@ def __enter__(self):
'-U', # unidirectional pipe from file to port
'TCP4-LISTEN:2399,reuseaddr,fork',
'exec:"tail -c 1GB -F ' + socat_in + '"']
print ' '.join(socat_cmd)
print(' '.join(socat_cmd))
self._socat_process = subprocess.Popen(socat_cmd, preexec_fn=os.setsid) # See __exit__ for os.setsid
return self

Expand Down Expand Up @@ -82,38 +86,38 @@ def main():
# another reason while the temporary socat_in file is used instead of directly reading the test files).
time.sleep(1)
for t in test_list:
print 'Running test on {} with filter "{}" and expecting {}'.format(t[0], t[1], t[2])
with open(in_dir + t[0], "r") as input_file, open(socat_in, "w") as socat_file:
print 'cat {} > {}'.format(input_file.name, socat_file.name)
for line in input_file:
socat_file.write(line)
print('Running test on {} with filter "{}" and expecting {}'.format(t[0], t[1], t[2]))
with open(in_dir + t[0], "r", encoding='utf-8') as i_f, open(socat_in, "w", encoding='utf-8') as s_f:
print('cat {} > {}'.format(i_f.name, s_f.name))
for line in i_f:
s_f.write(line)
idf_exit_sequence = b'\x1d\n'
print 'echo "<exit>" >> {}'.format(socat_file.name)
socat_file.write(idf_exit_sequence)
print('echo "<exit>" >> {}'.format(s_f.name))
s_f.write(idf_exit_sequence.decode())
monitor_cmd = [idf_monitor,
'--port', 'socket://localhost:2399',
'--print_filter', t[1],
elf_file]
with open(out_dir + t[2], "w") as mon_out_f, open(monitor_error_output, "w") as mon_err_f:
with open(out_dir + t[2], "w", encoding='utf-8') as o_f, open(err_out, "w", encoding='utf-8') as e_f:
try:
(master_fd, slave_fd) = pty.openpty()
print ' '.join(monitor_cmd),
print ' > {} 2> {} < {}'.format(mon_out_f.name, mon_err_f.name, os.ttyname(slave_fd))
proc = subprocess.Popen(monitor_cmd, stdin=slave_fd, stdout=mon_out_f, stderr=mon_err_f,
print(' '.join(monitor_cmd), end=' ')
print(' > {} 2> {} < {}'.format(o_f.name, e_f.name, os.ttyname(slave_fd)))
proc = subprocess.Popen(monitor_cmd, stdin=slave_fd, stdout=o_f, stderr=e_f,
close_fds=True)
proc.wait()
finally:
os.close(slave_fd)
os.close(master_fd)
diff_cmd = ['diff', in_dir + t[2], out_dir + t[2]]
print ' '.join(diff_cmd)
print(' '.join(diff_cmd))
subprocess.check_call(diff_cmd)
print 'Test has passed'
print('Test has passed')
finally:
cleanup()

end = time.time()
print 'Execution took {:.2f} seconds'.format(end - start)
print('Execution took {:.2f} seconds'.format(end - start))

if __name__ == "__main__":
main()

0 comments on commit 7118f47

Please sign in to comment.