Skip to content

Commit

Permalink
[dagster-pipes] allow routing logs to arbitrary files
Browse files Browse the repository at this point in the history
Docs typo fix.
  • Loading branch information
danielgafni committed Sep 10, 2024
1 parent f8216a0 commit a191a5c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
def hello(context: dg.AssetExecutionContext):
context.log.info("Hello!")


@dg.asset
def world(context: dg.AssetExecutionContext):
context.log.info("World!")


defs = dg.Definitions(assets=[hello, world])

if __name__ == "__main__":
Expand Down
10 changes: 7 additions & 3 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,17 +512,21 @@ def _join_thread(thread: Thread, thread_name: str) -> None:
raise DagsterPipesExecutionError(f"Timed out waiting for {thread_name} thread to finish.")


def extract_message_or_forward_to_stdout(handler: "PipesMessageHandler", log_line: str):
def extract_message_or_forward_to_file(handler: "PipesMessageHandler", log_line: str, file: TextIO):
# exceptions as control flow, you love to see it
try:
message = json.loads(log_line)
if PIPES_PROTOCOL_VERSION_FIELD in message.keys():
handler.handle_message(message)
else:
sys.stdout.writelines((log_line, "\n"))
file.writelines((log_line, "\n"))
except Exception:
# move non-message logs in to stdout for compute log capture
sys.stdout.writelines((log_line, "\n"))
file.writelines((log_line, "\n"))


def extract_message_or_forward_to_stdout(handler: "PipesMessageHandler", log_line: str):
extract_message_or_forward_to_file(handler=handler, log_line=log_line, file=sys.stdout)


_FAIL_TO_YIELD_ERROR_MESSAGE = (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import base64
import random
import string
import sys
from contextlib import contextmanager
from typing import Any, Dict, Generator, Iterator, List, Optional, Sequence, TypedDict
from typing import Any, Dict, Generator, Iterator, List, Optional, Sequence, TextIO, TypedDict

import boto3
import dagster._check as check
Expand All @@ -13,6 +14,7 @@
from dagster._core.pipes.utils import (
PipesBlobStoreMessageReader,
PipesLogReader,
extract_message_or_forward_to_file,
extract_message_or_forward_to_stdout,
)
from dagster_pipes import PipesDefaultMessageWriter
Expand Down Expand Up @@ -139,6 +141,7 @@ def consume_cloudwatch_logs(
log_stream: str,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
output_file: TextIO = sys.stdout,
) -> None:
"""Reads logs from AWS CloudWatch and forwards them to Dagster for events extraction and logging.
Expand All @@ -151,6 +154,7 @@ def consume_cloudwatch_logs(
end_time (Optional[int]): The end of the time range, expressed as the number of
milliseconds after ``Jan 1, 1970 00:00:00 UTC``. Events with a timestamp equal to or
later than this time are not included.
output_file: (Optional[TextIO]): A file to write the logs to. Defaults to sys.stdout.
"""
handler = check.not_none(
self._handler, "Can only consume logs within context manager scope."
Expand All @@ -161,7 +165,7 @@ def consume_cloudwatch_logs(
):
for event in events_batch:
for log_line in event["message"].splitlines():
extract_message_or_forward_to_stdout(handler, log_line)
extract_message_or_forward_to_file(handler, log_line, output_file)

def no_messages_debug_text(self) -> str:
return "Attempted to read messages by extracting them from CloudWatch logs directly."
Expand Down

0 comments on commit a191a5c

Please sign in to comment.