Skip to content

Commit

Permalink
tools: fix multi-byte character appearance in idf.py monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
simon.chupin committed Aug 29, 2022
1 parent 39c47cb commit 3423042
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions tools/idf_py_actions/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,34 +233,52 @@ def delete_ansi_escape(text: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)

def prepare_for_print(out: bytes) -> str:
# errors='ignore' is here because some chips produce some garbage bytes
result = out.decode(errors='ignore')
def prepare_for_print(out: str) -> str:
if not output_stream.isatty():
# delete escape sequence if we printing in environments where ANSI coloring is disabled
return delete_ansi_escape(result)
return result
return delete_ansi_escape(out)
return out

def print_progression(output: str) -> None:
# Print a new line on top of the previous line
sys.stdout.write('\x1b[K')
print('\r', end='')
print(fit_text_in_terminal(output.strip('\n\r')), end='', file=output_stream)

async def read_stream() -> Optional[str]:
output_b = await input_stream.readline()
if not output_b:
return None
return output_b.decode(errors='ignore')

async def read_interactive_stream() -> Optional[str]:
buffer = b''
while True:
output_b = await input_stream.read(1)
if not output_b:
return None
try:
return (buffer + output_b).decode()
except UnicodeDecodeError:
buffer += output_b
if len(buffer) > 4:
# Multi-byte character contain up to 4 bytes and if buffer have more then 4 bytes
# and still can not decode it we can just ignore some bytes
return buffer.decode(errors='ignore')

try:
with open(output_filename, 'w') as output_file:
with open(output_filename, 'w', encoding='utf8') as output_file:
while True:
if self.interactive:
out = await input_stream.read(1)
output = await read_interactive_stream()
else:
out = await input_stream.readline()
if not out:
output = await read_stream()
if not output:
break
output = prepare_for_print(out)
output = prepare_for_print(output)
output_file.write(output)

# print output in progression way but only the progression related (that started with '[') and if verbose flag is not set
if self.force_progression and output[0] == '[' and '-v' not in self.args and output_stream.isatty():
# print output in progression way but only the progression related (that started with '[') and if verbose flag is not set
print_progression(output)
else:
output_stream.write(output)
Expand Down

0 comments on commit 3423042

Please sign in to comment.