diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index dbc21166df0e8..93820e2ecc59e 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index 9d9765a94f4e0..020a91a3ff7e4 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 5d8d811aff96d..954afc9701671 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 0b5514b7cca86..249fa54423a91 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/pyright/alt-1/requirements-pinned.txt b/pyright/alt-1/requirements-pinned.txt index 60c3fefae89a1..1df9e57210d0a 100644 --- a/pyright/alt-1/requirements-pinned.txt +++ b/pyright/alt-1/requirements-pinned.txt @@ -9,7 +9,6 @@ alembic==1.13.2 aniso8601==9.0.1 annotated-types==0.7.0 anyio==4.4.0 -appnope==0.1.4 argon2-cffi==23.1.0 argon2-cffi-bindings==21.2.0 arrow==1.3.0 @@ -25,9 +24,9 @@ backports-tarfile==1.2.0 beautifulsoup4==4.12.3 bleach==6.1.0 boto3==1.35.7 -boto3-stubs-lite==1.35.12 +boto3-stubs-lite==1.35.14 botocore==1.35.7 -botocore-stubs==1.35.12 +botocore-stubs==1.35.14 buildkite-test-collector==0.1.9 cachetools==5.5.0 caio==0.9.17 @@ -78,7 +77,7 @@ decopatch==1.4.10 decorator==5.1.1 deepdiff==8.0.1 defusedxml==0.7.1 -deltalake==0.19.1 +deltalake==0.19.2 dill==0.3.8 distlib==0.3.8 docker==7.1.0 @@ -87,14 +86,14 @@ duckdb==1.0.0 execnet==2.1.1 executing==2.1.0 fastjsonschema==2.20.0 -filelock==3.15.4 +filelock==3.16.0 fonttools==4.53.1 fqdn==1.5.1 frozenlist==1.4.1 fsspec==2024.3.1 gcsfs==2024.3.1 google-api-core==2.19.2 -google-api-python-client==2.143.0 +google-api-python-client==2.144.0 google-auth==2.34.0 google-auth-httplib2==0.2.0 google-auth-oauthlib==1.2.1 @@ -108,6 +107,7 @@ gql==3.5.0 graphene==3.3 graphql-core==3.2.4 graphql-relay==3.2.0 +greenlet==3.0.3 grpcio==1.66.1 grpcio-health-checking==1.62.3 grpcio-status==1.62.3 @@ -118,7 +118,7 @@ httplib2==0.22.0 httptools==0.6.1 httpx==0.27.2 humanfriendly==10.0 -hypothesis==6.111.2 +hypothesis==6.112.0 idna==3.8 importlib-metadata==6.11.0 iniconfig==2.0.0 @@ -131,6 +131,7 @@ jaraco-classes==3.4.0 jaraco-context==6.0.1 jaraco-functools==4.0.2 jedi==0.19.1 +jeepney==0.8.0 jinja2==3.1.4 jmespath==1.0.1 joblib==1.4.2 @@ -163,13 +164,14 @@ mdurl==0.1.2 minimal-snowplow-tracker==0.0.2 mistune==3.0.2 mock==3.0.5 -more-itertools==10.4.0 +more-itertools==10.5.0 morefs==0.2.2 msgpack==1.0.8 multidict==6.0.5 multimethod==1.10 mypy==1.11.2 mypy-boto3-ecs==1.35.2 +mypy-boto3-emr-serverless==1.35.0 mypy-boto3-glue==1.35.3 mypy-extensions==1.0.0 mypy-protobuf==3.6.0 @@ -198,7 +200,7 @@ pathspec==0.12.1 pexpect==4.9.0 pillow==10.4.0 pip==24.2 -platformdirs==4.2.2 +platformdirs==4.3.2 pluggy==1.5.0 polars==1.6.0 -e examples/project_fully_featured @@ -215,8 +217,8 @@ pyarrow==17.0.0 pyasn1==0.6.0 pyasn1-modules==0.4.0 pycparser==2.22 -pydantic==2.8.2 -pydantic-core==2.20.1 +pydantic==2.9.0 +pydantic-core==2.23.2 pygments==2.18.0 pyjwt==2.9.0 pylint==3.2.7 @@ -256,22 +258,23 @@ s3transfer==0.10.2 scikit-learn==1.5.1 scipy==1.14.1 seaborn==0.13.2 +secretstorage==3.3.3 send2trash==1.8.3 setuptools==74.1.2 shellingham==1.5.4 six==1.16.0 -slack-sdk==3.31.0 +slack-sdk==3.32.0 sniffio==1.3.1 snowflake-connector-python==3.12.1 snowflake-sqlalchemy==1.5.1 sortedcontainers==2.4.0 soupsieve==2.6 -sqlalchemy==1.4.53 -sqlglot==25.19.0 -sqlglotrs==0.2.10 +sqlalchemy==1.4.54 +sqlglot==25.20.1 +sqlglotrs==0.2.12 sqlparse==0.5.1 stack-data==0.6.3 -starlette==0.38.4 +starlette==0.38.5 structlog==24.4.0 syrupy==4.7.1 tabulate==0.9.0 @@ -283,12 +286,12 @@ tomli==2.0.1 tomlkit==0.13.2 toposort==1.10 tornado==6.4.1 -tox==4.18.0 +tox==4.18.1 tqdm==4.66.5 traitlets==5.14.3 typeguard==4.3.0 typer==0.12.5 -types-awscrt==0.21.2 +types-awscrt==0.21.5 types-backports==0.1.3 types-certifi==2021.10.8.3 types-cffi==1.16.0.20240331 @@ -297,14 +300,14 @@ types-croniter==3.0.3.20240731 types-cryptography==3.3.23.2 types-mock==5.1.0.20240425 types-paramiko==3.4.0.20240423 -types-protobuf==5.27.0.20240626 +types-protobuf==5.27.0.20240907 types-pyopenssl==24.1.0.20240722 -types-python-dateutil==2.9.0.20240821 +types-python-dateutil==2.9.0.20240906 types-pytz==2024.1.0.20240417 types-pyyaml==6.0.12.20240808 -types-requests==2.32.0.20240905 +types-requests==2.32.0.20240907 types-s3transfer==0.10.2 -types-setuptools==74.0.0.20240831 +types-setuptools==74.1.0.20240907 types-simplejson==3.19.0.20240801 types-six==1.16.21.20240513 types-sqlalchemy==1.4.53.34 @@ -320,7 +323,7 @@ uritemplate==4.1.1 urllib3==2.2.2 uvicorn==0.30.6 uvloop==0.20.0 -virtualenv==20.26.3 +virtualenv==20.26.4 watchdog==5.0.2 watchfiles==0.24.0 wcwidth==0.2.13 @@ -330,5 +333,5 @@ websocket-client==1.8.0 websockets==13.0.1 wheel==0.44.0 wrapt==1.16.0 -yarl==1.9.11 +yarl==1.11.0 zipp==3.20.1 diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt index 60694065b2f9e..545c8364ad855 100644 --- a/pyright/master/requirements-pinned.txt +++ b/pyright/master/requirements-pinned.txt @@ -25,7 +25,6 @@ apache-airflow-providers-sqlite==3.8.2 apeye==1.4.1 apeye-core==1.1.5 apispec==6.6.1 -appnope==0.1.4 argcomplete==3.5.0 argon2-cffi==23.1.0 argon2-cffi-bindings==21.2.0 @@ -58,10 +57,10 @@ bitmath==1.3.3.1 bleach==6.1.0 blinker==1.8.2 bokeh==3.5.2 -boto3==1.35.12 -boto3-stubs-lite==1.35.12 -botocore==1.35.12 -botocore-stubs==1.35.12 +boto3==1.35.14 +boto3-stubs-lite==1.35.14 +botocore==1.35.14 +botocore-stubs==1.35.14 buildkite-test-collector==0.1.9 cachecontrol==0.14.0 cached-property==1.5.2 @@ -73,7 +72,7 @@ cattrs==24.1.0 celery==5.4.0 certifi==2024.8.30 cffi==1.17.1 -cfn-lint==1.12.3 +cfn-lint==1.12.4 chardet==5.2.0 charset-normalizer==3.3.2 click==8.1.7 @@ -215,10 +214,10 @@ execnet==2.1.1 executing==2.1.0 expandvars==0.12.0 faiss-cpu==1.8.0 -fastavro==1.9.5 +fastavro==1.9.7 fastjsonschema==2.20.0 -e examples/feature_graph_backed_assets -filelock==3.15.4 +filelock==3.16.0 flask==2.2.5 flask-appbuilder==4.3.6 flask-babel==2.0.0 @@ -239,7 +238,7 @@ gitdb==4.0.11 gitpython==3.1.43 giturlparse==0.12.0 google-api-core==2.19.2 -google-api-python-client==2.143.0 +google-api-python-client==2.144.0 google-auth==2.34.0 google-auth-httplib2==0.2.0 google-auth-oauthlib==1.2.1 @@ -256,6 +255,7 @@ graphql-core==3.2.4 graphql-relay==3.2.0 graphviz==0.20.3 great-expectations==0.17.11 +greenlet==3.0.3 grpcio==1.66.1 grpcio-health-checking==1.62.3 grpcio-status==1.62.3 @@ -270,7 +270,7 @@ httptools==0.6.1 httpx==0.27.2 humanfriendly==10.0 humanize==4.10.0 -hypothesis==6.111.2 +hypothesis==6.112.0 idna==3.8 ijson==3.3.0 imagesize==1.4.1 @@ -321,7 +321,7 @@ langchain-community==0.2.9 langchain-core==0.2.38 langchain-openai==0.1.14 langchain-text-splitters==0.2.4 -langsmith==0.1.114 +langsmith==0.1.116 lazy-object-proxy==1.10.0 leather==0.4.0 limits==3.13.0 @@ -349,16 +349,17 @@ mistune==3.0.2 mixpanel==4.10.1 mlflow==1.27.0 mock==3.0.5 -more-itertools==10.4.0 +more-itertools==10.5.0 morefs==0.2.2 moto==4.2.14 mpmath==1.3.0 -msal==1.30.0 +msal==1.31.0 msal-extensions==1.2.0 msgpack==1.0.8 multidict==6.0.5 multimethod==1.10 mypy-boto3-ecs==1.35.2 +mypy-boto3-emr-serverless==1.35.0 mypy-boto3-glue==1.35.3 mypy-extensions==1.0.0 mypy-protobuf==3.6.0 @@ -375,13 +376,25 @@ noteable-origami==1.1.5 notebook==7.2.2 notebook-shim==0.2.4 numpy==1.26.4 +nvidia-cublas-cu12==12.1.3.1 +nvidia-cuda-cupti-cu12==12.1.105 +nvidia-cuda-nvrtc-cu12==12.1.105 +nvidia-cuda-runtime-cu12==12.1.105 +nvidia-cudnn-cu12==9.1.0.70 +nvidia-cufft-cu12==11.0.2.54 +nvidia-curand-cu12==10.3.2.106 +nvidia-cusolver-cu12==11.4.5.107 +nvidia-cusparse-cu12==12.1.0.106 +nvidia-nccl-cu12==2.20.5 +nvidia-nvjitlink-cu12==12.6.68 +nvidia-nvtx-cu12==12.1.105 oauth2client==4.1.3 oauthlib==3.2.2 objgraph==3.6.1 onnx==1.16.2 onnxconverter-common==1.13.0 onnxruntime==1.19.2 -openai==1.43.0 +openai==1.44.0 openapi-schema-validator==0.6.2 openapi-spec-validator==0.7.1 opentelemetry-api==1.27.0 @@ -418,7 +431,7 @@ pexpect==4.9.0 pillow==10.4.0 pip==24.2 pkginfo==1.11.1 -platformdirs==4.2.2 +platformdirs==4.3.2 plotly==5.24.0 pluggy==1.5.0 ply==3.11 @@ -498,7 +511,7 @@ rich-argparse==1.5.2 rpds-py==0.20.0 rsa==4.9 ruamel-yaml==0.17.17 -ruff==0.6.3 +ruff==0.6.4 s3transfer==0.10.2 scikit-learn==1.5.1 scipy==1.14.1 @@ -517,9 +530,9 @@ simplejson==3.19.3 six==1.16.0 skein==0.8.2 skl2onnx==1.17.0 -slack-sdk==3.31.0 +slack-sdk==3.32.0 sling==1.2.18 -sling-mac-arm64==1.2.18 +sling-linux-amd64==1.2.18 smmap==5.0.1 sniffio==1.3.1 snowballstemmer==2.2.0 @@ -528,7 +541,7 @@ snowflake-sqlalchemy==1.6.1 sortedcontainers==2.4.0 soupsieve==2.6 sphinx==8.0.2 -sphinx-autodoc-typehints==2.3.0 +sphinx-autodoc-typehints==2.4.0 sphinx-jinja2-compat==0.3.0 sphinx-prompt==1.9.0 sphinx-tabs==3.4.5 @@ -539,16 +552,16 @@ sphinxcontrib-htmlhelp==2.1.0 sphinxcontrib-jsmath==1.0.1 sphinxcontrib-qthelp==2.0.0 sphinxcontrib-serializinghtml==2.0.0 -sqlalchemy==1.4.53 +sqlalchemy==1.4.54 sqlalchemy-jsonfield==1.0.2 sqlalchemy-utils==0.41.2 -sqlglot==25.19.0 -sqlglotrs==0.2.10 +sqlglot==25.20.1 +sqlglotrs==0.2.12 sqlparse==0.5.1 sshpubkeys==3.3.1 sshtunnel==0.4.0 stack-data==0.6.3 -starlette==0.38.4 +starlette==0.38.5 structlog==24.4.0 sympy==1.13.2 syrupy==4.7.1 @@ -575,6 +588,7 @@ tqdm==4.66.5 traitlets==5.14.3 trio==0.26.2 trio-websocket==0.11.1 +triton==3.0.0 -e examples/experimental/dagster-airlift/examples/tutorial-example -e examples/tutorial_notebook_assets twilio==9.3.0 @@ -582,7 +596,7 @@ twine==1.15.0 typeguard==4.3.0 typepy==1.3.2 typer==0.12.5 -types-awscrt==0.21.2 +types-awscrt==0.21.5 types-backports==0.1.3 types-certifi==2021.10.8.3 types-cffi==1.16.0.20240331 @@ -591,14 +605,14 @@ types-croniter==3.0.3.20240731 types-cryptography==3.3.23.2 types-mock==5.1.0.20240425 types-paramiko==3.4.0.20240423 -types-protobuf==5.27.0.20240626 +types-protobuf==5.27.0.20240907 types-pyopenssl==24.1.0.20240722 -types-python-dateutil==2.9.0.20240821 +types-python-dateutil==2.9.0.20240906 types-pytz==2024.1.0.20240417 types-pyyaml==6.0.12.20240808 types-requests==2.31.0.6 types-s3transfer==0.10.2 -types-setuptools==74.0.0.20240831 +types-setuptools==74.1.0.20240907 types-simplejson==3.19.0.20240801 types-six==1.16.21.20240513 types-sqlalchemy==1.4.53.34 @@ -612,7 +626,7 @@ tzdata==2024.1 tzlocal==5.2 uc-micro-py==1.0.3 unicodecsv==0.14.1 -universal-pathlib==0.2.3 +universal-pathlib==0.2.5 uri-template==1.3.0 uritemplate==4.1.1 urllib3==1.26.20 @@ -621,7 +635,7 @@ uvicorn==0.30.6 uvloop==0.20.0 vine==5.1.0 virtualenv==20.25.0 -wandb==0.17.8 +wandb==0.17.9 watchdog==5.0.2 watchfiles==0.24.0 wcwidth==0.2.13 @@ -644,6 +658,6 @@ wtforms==3.0.1 xgboost==2.1.1 xmltodict==0.12.0 xyzservices==2024.9.0 -yarl==1.9.11 +yarl==1.11.0 zict==3.0.0 zipp==3.20.1 diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py index e513f5cc16adf..263d3aa48b6d1 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py @@ -1,4 +1,9 @@ -from dagster_aws.pipes.clients import PipesECSClient, PipesGlueClient, PipesLambdaClient +from dagster_aws.pipes.clients import ( + PipesECSClient, + PipesEMRServerlessClient, + PipesGlueClient, + PipesLambdaClient, +) from dagster_aws.pipes.context_injectors import ( PipesLambdaEventContextInjector, PipesS3ContextInjector, @@ -18,4 +23,5 @@ "PipesS3MessageReader", "PipesLambdaLogsMessageReader", "PipesCloudWatchMessageReader", + "PipesEMRServerlessClient", ] diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py index b7625af2e2cfb..a6711bbf1ce82 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py @@ -1,5 +1,6 @@ from dagster_aws.pipes.clients.ecs import PipesECSClient +from dagster_aws.pipes.clients.emr_serverless import PipesEMRServerlessClient from dagster_aws.pipes.clients.glue import PipesGlueClient from dagster_aws.pipes.clients.lambda_ import PipesLambdaClient -__all__ = ["PipesGlueClient", "PipesLambdaClient", "PipesECSClient"] +__all__ = ["PipesGlueClient", "PipesLambdaClient", "PipesECSClient", "PipesEMRServerlessClient"] diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_serverless.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_serverless.py new file mode 100644 index 0000000000000..35fa7ff749332 --- /dev/null +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_serverless.py @@ -0,0 +1,288 @@ +import sys +import time +from typing import TYPE_CHECKING, Any, Dict, Optional, cast + +import boto3 +import dagster._check as check +from dagster import DagsterInvariantViolationError, PipesClient +from dagster._annotations import public +from dagster._core.definitions.resource_annotation import TreatAsResourceParam +from dagster._core.errors import DagsterExecutionInterruptedError +from dagster._core.execution.context.compute import OpExecutionContext +from dagster._core.pipes.client import ( + PipesClientCompletedInvocation, + PipesContextInjector, + PipesMessageReader, +) +from dagster._core.pipes.context import PipesSession +from dagster._core.pipes.utils import PipesEnvContextInjector, open_pipes_session + +from dagster_aws.pipes.message_readers import PipesCloudWatchMessageReader + +if TYPE_CHECKING: + from mypy_boto3_emr_serverless.client import EMRServerlessClient + from mypy_boto3_emr_serverless.type_defs import ( + GetJobRunResponseTypeDef, + JobRunStateType, + StartJobRunRequestRequestTypeDef, + StartJobRunResponseTypeDef, + ) + +AWS_SERVICE_NAME = "EMR Serverless" + + +class PipesEMRServerlessClient(PipesClient, TreatAsResourceParam): + """A pipes client for running workloads on AWS EMR Serverless. + + Args: + client (Optional[boto3.client]): The boto3 AWS EMR Serverless client used to interact with AWS EMR Serverless. + context_injector (Optional[PipesContextInjector]): A context injector to use to inject + context into AWS EMR Serverless workload. Defaults to :py:class:`PipesEnvContextInjector`. + message_reader (Optional[PipesMessageReader]): A message reader to use to read messages + from the AWS EMR Serverless workload. Defaults to :py:class:`PipesCloudWatchMessageReader`. + forward_termination (bool): Whether to cancel the AWS EMR Serverless workload if the Dagster process receives a termination signal. + poll_interval (float): The interval in seconds to poll the AWS EMR Serverless workload for status updates. Defaults to 5 seconds. + """ + + AWS_SERVICE_NAME = AWS_SERVICE_NAME + + def __init__( + self, + client=None, + context_injector: Optional[PipesContextInjector] = None, + message_reader: Optional[PipesMessageReader] = None, + forward_termination: bool = True, + poll_interval: float = 5.0, + ): + self._client = client or boto3.client("aws-emr-serverless") + self._context_injector = context_injector or PipesEnvContextInjector() + self._message_reader = message_reader or PipesCloudWatchMessageReader() + self.forward_termination = check.bool_param(forward_termination, "forward_termination") + self.poll_interval = poll_interval + + @property + def client(self) -> "EMRServerlessClient": + return self._client + + @property + def context_injector(self) -> PipesContextInjector: + return self._context_injector + + @property + def message_reader(self) -> PipesMessageReader: + return self._message_reader + + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + + @public + def run( + self, + *, + context: OpExecutionContext, + start_job_run_params: "StartJobRunRequestRequestTypeDef", + extras: Optional[Dict[str, Any]] = None, + ) -> PipesClientCompletedInvocation: + """Run a workload on AWS EMR Serverless, enriched with the pipes protocol. + + Args: + context (OpExecutionContext): The context of the currently executing Dagster op or asset. + params (dict): Parameters for the ``start_job_run`` boto3 AWS EMR Serverless client call. + See `Boto3 API Documentation `_ + extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session in the external process. + + Returns: + PipesClientCompletedInvocation: Wrapper containing results reported by the external + process. + """ + with open_pipes_session( + context=context, + message_reader=self.message_reader, + context_injector=self.context_injector, + extras=extras, + ) as session: + start_job_run_params = self._enrich_start_params(context, session, start_job_run_params) + start_response = self._start(context, start_job_run_params) + try: + completion_response = self._wait_for_completion(context, start_response) + context.log.info(f"[pipes] {self.AWS_SERVICE_NAME} workload is complete!") + self._read_messages(context, completion_response) + return PipesClientCompletedInvocation(session) + + except DagsterExecutionInterruptedError: + if self.forward_termination: + context.log.warning( + f"[pipes] Dagster process interrupted! Will terminate external {self.AWS_SERVICE_NAME} workload." + ) + self._terminate(context, start_response) + raise + + def _enrich_start_params( + self, + context: OpExecutionContext, + session: PipesSession, + params: "StartJobRunRequestRequestTypeDef", + ) -> "StartJobRunRequestRequestTypeDef": + # inject Dagster tags + tags = params.get("tags", {}) + tags = { + **tags, + "dagster/run_id": context.run_id, + } + + params["tags"] = tags # pyright: ignore(reportAttributeAccessIssue) + + # inject env variables via --conf spark.executorEnv.env.= + + dagster_env_vars = {} + + dagster_env_vars.update(session.get_bootstrap_env_vars()) + + if "jobDriver" not in params: + params["jobDriver"] = {} # pyright: ignore(reportAttributeAccessIssue) + + if "sparkSubmit" not in params["jobDriver"]: + params["jobDriver"]["sparkSubmit"] = {} # pyright: ignore(reportAttributeAccessIssue) + + params["jobDriver"]["sparkSubmit"]["sparkSubmitParameters"] = params.get( # pyright: ignore(reportAttributeAccessIssue) + "jobDriver", {} + ).get("sparkSubmit", {}).get("sparkSubmitParameters", "") + "".join( + [ + f" --conf spark.emr-serverless.driverEnv.{key}={value}" + for key, value in dagster_env_vars.items() + ] + ) + + return cast("StartJobRunRequestRequestTypeDef", params) + + def _start( + self, context: OpExecutionContext, params: "StartJobRunRequestRequestTypeDef" + ) -> "StartJobRunResponseTypeDef": + response = self._client.start_job_run(**params) + job_run_id = response["jobRunId"] + context.log.info( + f"[pipes] {self.AWS_SERVICE_NAME} job started with job_run_id {job_run_id}" + ) + return response + + def _wait_for_completion( + self, context: OpExecutionContext, start_response: "StartJobRunResponseTypeDef" + ) -> "GetJobRunResponseTypeDef": # pyright: ignore(reportReturnType) + job_run_id = start_response["jobRunId"] + + while response := self._client.get_job_run( + applicationId=start_response["applicationId"], + jobRunId=job_run_id, + ): + state: "JobRunStateType" = response["jobRun"]["state"] + + if state in ["FAILED", "CANCELLED", "CANCELLING"]: + context.log.error( + f"[pipes] {self.AWS_SERVICE_NAME} job {job_run_id} terminated with state: {state}. Details:\n{response['jobRun'].get('stateDetails')}" + ) + raise RuntimeError( + f"{self.AWS_SERVICE_NAME} job failed" + ) # TODO: introduce something like DagsterPipesRemoteExecutionError + elif state == "SUCCESS": + context.log.info( + f"[pipes] {self.AWS_SERVICE_NAME} job {job_run_id} completed with state: {state}" + ) + return response + elif state in ["PENDING", "SUBMITTED", "SCHEDULED"]: + time.sleep(self.poll_interval) + continue + else: + raise DagsterInvariantViolationError( + f"Unexpected state for AWS EMR Serverless job {job_run_id}: {state}" + ) + + def _read_messages(self, context: OpExecutionContext, response: "GetJobRunResponseTypeDef"): + application_id = response["jobRun"]["applicationId"] + job_id = response["jobRun"]["jobRunId"] + + application = self.client.get_application(applicationId=application_id)["application"] + + application_type = application["type"] + + if application_type == "Spark": + worker_type = "SPARK_DRIVER" + elif application_type == "Hive": + worker_type = "HIVE_DRIVER" + else: + raise NotImplementedError(f"Application type {application_type} is not supported") + + if not isinstance(self.message_reader, PipesCloudWatchMessageReader): + context.log.warning( + f"[pipes] {self.message_reader} is not supported for {self.AWS_SERVICE_NAME}. Dagster won't be able to receive logs and messages from the job." + ) + return + + # https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/logging.html#jobs-log-storage-cw + + # we can get cloudwatch logs from the known log group + monitoring_configuration = response["jobRun"]["configurationOverrides"][ + "monitoringConfiguration" + ] + if not monitoring_configuration["cloudWatchLoggingConfiguration"]["enabled"]: + context.log.warning( + f"[pipes] Recieved {self.message_reader}, but CloudWatch logging is not enabled for {self.AWS_SERVICE_NAME} job. Dagster won't be able to receive logs and messages from the job." + ) + return + + if log_types := monitoring_configuration["cloudWatchLoggingConfiguration"].get("logTypes"): + # get the configured output streams + # but limit them with "stdout" and "stderr" + output_streams = list( + map( + lambda x: x.lower(), + set(log_types.get(worker_type, ["STDOUT", "STDERR"])) & {"stdout", "stderr"}, + ) + ) + else: + output_streams = ["stdout", "stderr"] + + log_group = ( + monitoring_configuration["cloudWatchLoggingConfiguration"].get("logGroupName") + or application["monitoringConfiguration"] + .get("cloudWatchLoggingConfiguration", {}) + .get("logGroupName") + or "/aws/emr-serverless" + ) + + attempt = response["jobRun"].get("attempt") + + if attempt is not None and attempt > 1: + log_stream = ( + f"/applications/{application_id}/jobs/{job_id}/attempts/{attempt}/{worker_type}" + ) + else: + log_stream = f"/applications/{application_id}/jobs/{job_id}/{worker_type}" + + if log_stream_prefix := monitoring_configuration["cloudWatchLoggingConfiguration"].get( + "logStreamNamePrefix" + ): + log_stream = f"{log_stream_prefix}{log_stream}" + + output_files = { + "stdout": sys.stdout, + "stderr": sys.stderr, + } + + # TODO: do this in a background thread in real-time once https://github.com/dagster-io/dagster/pull/24098 is merged + for output_stream in output_streams: + output_file = output_files[output_stream] + context.log.debug( + f"[pipes] Reading AWS CloudWatch logs from group {log_group} stream {log_stream}/{output_stream}" + ) + self.message_reader.consume_cloudwatch_logs( + log_group, + f"{log_stream}/{output_stream}", + start_time=int(response["jobRun"]["attemptCreatedAt"].timestamp() * 1000), + output_file=output_file, + ) + + def _terminate(self, context: OpExecutionContext, start_response: "StartJobRunResponseTypeDef"): + job_run_id = start_response["jobRunId"] + context.log.info(f"[pipes] Terminating {self.AWS_SERVICE_NAME} job run {job_run_id}") + self._client.cancel_job_run(jobRunId=job_run_id) diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/pipes_tests/test_pipes.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/pipes_tests/test_pipes.py index 134c7378cbac7..656b627af270b 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/pipes_tests/test_pipes.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/pipes_tests/test_pipes.py @@ -10,7 +10,8 @@ import time from contextlib import contextmanager from tempfile import NamedTemporaryFile -from typing import Any, Callable, Iterator, Literal +from typing import Any, Callable, Iterator, Literal, Tuple +from uuid import uuid4 import boto3 import pytest @@ -30,6 +31,7 @@ from dagster_aws.pipes import ( PipesCloudWatchMessageReader, PipesECSClient, + PipesEMRServerlessClient, PipesGlueClient, PipesLambdaClient, PipesLambdaLogsMessageReader, @@ -38,6 +40,7 @@ ) from moto.server import ThreadedMotoServer # type: ignore # (pyright bug) from mypy_boto3_ecs import ECSClient +from mypy_boto3_emr_serverless import EMRServerlessClient from dagster_aws_tests.pipes_tests.fake_ecs import LocalECSMockClient from dagster_aws_tests.pipes_tests.fake_glue import LocalGlueMockClient @@ -772,3 +775,71 @@ def materialize_asset(env, return_dict): # breakpoint() assert return_dict[0]["tasks"][0]["containers"][0]["exitCode"] == 1 assert return_dict[0]["tasks"][0]["stoppedReason"] == "Dagster process was interrupted" + + +EMR_SERVERLESS_APP_NAME = "Example" + + +@pytest.fixture +def emr_serverless_setup( + moto_server, external_s3_glue_script, s3_client +) -> Tuple[EMRServerlessClient, str]: + client = boto3.client("emr-serverless", region_name="us-east-1", endpoint_url=_MOTO_SERVER_URL) + resp = client.create_application( + type="SPARK", + releaseLabel="emr-7.2.0-latest", + clientToken=str(uuid4()), + ) + return client, resp["applicationId"] + + +def test_emr_serverless_manual(emr_serverless_setup: Tuple[EMRServerlessClient, str]): + client, application_id = emr_serverless_setup + + @asset + def my_asset(context: AssetExecutionContext, emr_serverless_client: PipesEMRServerlessClient): + message_reader = PipesCloudWatchMessageReader() + context_injector = PipesEnvContextInjector() + + with open_pipes_session( + context=context, + message_reader=message_reader, + context_injector=context_injector, + ) as session: + params = { + "applicationId": application_id, + "executionRoleArn": "arn:aws:iam::123456789012:role/EMRServerlessRole", + "jobDriver": { + "sparkSubmit": { + "entryPoint": "s3://my-bucket/my-script.py", + } + }, + } + params = emr_serverless_client._enrich_start_params( # noqa: SLF001 + context=context, session=session, params=params + ) + + assert params["tags"]["dagster/run_id"] == context.run_id + assert ( + "--conf spark.emr-serverless.driverEnv.DAGSTER_PIPES_CONTEXT=" + in params["jobDriver"]["sparkSubmit"]["sparkSubmitParameters"] + ) + assert ( + "--conf spark.emr-serverless.driverEnv.DAGSTER_PIPES_MESSAGES=" + in params["jobDriver"]["sparkSubmit"]["sparkSubmitParameters"] + ) + + # moto doesn't have start_job_run implemented so this is as far as we can get with it right now + + return session.get_results() + + with instance_for_test() as instance: + materialize( + [my_asset], + resources={ + "emr_serverless_client": PipesEMRServerlessClient( + client=client, + ) + }, + instance=instance, + ) diff --git a/python_modules/libraries/dagster-aws/setup.py b/python_modules/libraries/dagster-aws/setup.py index 1808a13fcd820..cbd4ac61e160b 100644 --- a/python_modules/libraries/dagster-aws/setup.py +++ b/python_modules/libraries/dagster-aws/setup.py @@ -37,7 +37,7 @@ def get_version() -> str: python_requires=">=3.8,<3.13", install_requires=[ "boto3", - "boto3-stubs-lite[ecs,glue]", + "boto3-stubs-lite[ecs,glue,emr-serverless]", f"dagster{pin}", "packaging", "requests", @@ -47,7 +47,7 @@ def get_version() -> str: "pyspark": ["dagster-pyspark"], "test": [ "botocore!=1.32.1", - "moto[s3,server,glue]>=2.2.8,<5.0", + "moto[s3,server,glue,emrserverless]>=2.2.8,<5.0", "requests-mock", "xmltodict==0.12.0", # pinned until moto>=3.1.9 (https://github.com/spulec/moto/issues/5112) ],