From a191a5c3b180a04fa24f90afa971d1ae51ca37e4 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Tue, 10 Sep 2024 15:12:25 +0200 Subject: [PATCH] [dagster-pipes] allow routing logs to arbitrary files Docs typo fix. --- .../docs_beta_snippets/getting-started/hello-world.py | 2 ++ python_modules/dagster/dagster/_core/pipes/utils.py | 10 +++++++--- .../dagster-aws/dagster_aws/pipes/message_readers.py | 8 ++++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/getting-started/hello-world.py b/examples/docs_beta_snippets/docs_beta_snippets/getting-started/hello-world.py index 0b39f793e7250..535fecc28b08c 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/getting-started/hello-world.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/getting-started/hello-world.py @@ -5,10 +5,12 @@ def hello(context: dg.AssetExecutionContext): context.log.info("Hello!") + @dg.asset def world(context: dg.AssetExecutionContext): context.log.info("World!") + defs = dg.Definitions(assets=[hello, world]) if __name__ == "__main__": diff --git a/python_modules/dagster/dagster/_core/pipes/utils.py b/python_modules/dagster/dagster/_core/pipes/utils.py index 120567fa721ba..690a4487692dd 100644 --- a/python_modules/dagster/dagster/_core/pipes/utils.py +++ b/python_modules/dagster/dagster/_core/pipes/utils.py @@ -512,17 +512,21 @@ def _join_thread(thread: Thread, thread_name: str) -> None: raise DagsterPipesExecutionError(f"Timed out waiting for {thread_name} thread to finish.") -def extract_message_or_forward_to_stdout(handler: "PipesMessageHandler", log_line: str): +def extract_message_or_forward_to_file(handler: "PipesMessageHandler", log_line: str, file: TextIO): # exceptions as control flow, you love to see it try: message = json.loads(log_line) if PIPES_PROTOCOL_VERSION_FIELD in message.keys(): handler.handle_message(message) else: - sys.stdout.writelines((log_line, "\n")) + file.writelines((log_line, "\n")) except Exception: # move non-message logs in to stdout for compute log capture - sys.stdout.writelines((log_line, "\n")) + file.writelines((log_line, "\n")) + + +def extract_message_or_forward_to_stdout(handler: "PipesMessageHandler", log_line: str): + extract_message_or_forward_to_file(handler=handler, log_line=log_line, file=sys.stdout) _FAIL_TO_YIELD_ERROR_MESSAGE = ( diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/message_readers.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/message_readers.py index 8b278efcc4d5d..787c10730da39 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/message_readers.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/message_readers.py @@ -1,8 +1,9 @@ import base64 import random import string +import sys from contextlib import contextmanager -from typing import Any, Dict, Generator, Iterator, List, Optional, Sequence, TypedDict +from typing import Any, Dict, Generator, Iterator, List, Optional, Sequence, TextIO, TypedDict import boto3 import dagster._check as check @@ -13,6 +14,7 @@ from dagster._core.pipes.utils import ( PipesBlobStoreMessageReader, PipesLogReader, + extract_message_or_forward_to_file, extract_message_or_forward_to_stdout, ) from dagster_pipes import PipesDefaultMessageWriter @@ -139,6 +141,7 @@ def consume_cloudwatch_logs( log_stream: str, start_time: Optional[int] = None, end_time: Optional[int] = None, + output_file: TextIO = sys.stdout, ) -> None: """Reads logs from AWS CloudWatch and forwards them to Dagster for events extraction and logging. @@ -151,6 +154,7 @@ def consume_cloudwatch_logs( end_time (Optional[int]): The end of the time range, expressed as the number of milliseconds after ``Jan 1, 1970 00:00:00 UTC``. Events with a timestamp equal to or later than this time are not included. + output_file: (Optional[TextIO]): A file to write the logs to. Defaults to sys.stdout. """ handler = check.not_none( self._handler, "Can only consume logs within context manager scope." @@ -161,7 +165,7 @@ def consume_cloudwatch_logs( ): for event in events_batch: for log_line in event["message"].splitlines(): - extract_message_or_forward_to_stdout(handler, log_line) + extract_message_or_forward_to_file(handler, log_line, output_file) def no_messages_debug_text(self) -> str: return "Attempted to read messages by extracting them from CloudWatch logs directly."