From ca96732847dac63685eedc3e1aa90d74948ece20 Mon Sep 17 00:00:00 2001 From: "t.kodama" <118150392+t0momi219@users.noreply.github.com> Date: Fri, 27 Sep 2024 01:16:10 +0900 Subject: [PATCH] URL encode dataset names to support multibyte characters (#1198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In projects containing models with names like the following, dataset creation fails, and an error occurs during execution. ```txt └── dbt └── my_project └── models ├── 日本語名モデル.sql └── 日本語名モデル.yml ``` ``` File "/home/airflow/.local/lib/python3.12/site-packages/airflow/datasets/__init__.py", line 78, in _sanitize_uri raise ValueError("Dataset URI must only consist of ASCII characters") ValueError: Dataset URI must only consist of ASCII characters ``` To support model names with multibyte characters, it might be good to URL encode the names. closes: https://github.com/astronomer/astronomer-cosmos/issues/1197 Co-authored-by: Tatiana Al-Chueyr --- cosmos/operators/local.py | 3 +- ...2\357\275\231\357\275\224\357\275\205.sql" | 2 ++ docs/configuration/render-config.rst | 2 +- tests/operators/test_local.py | 30 +++++++++++++++++++ 4 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 "dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 2ba2b18ff..701552f56 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -3,6 +3,7 @@ import json import os import tempfile +import urllib.parse import warnings from abc import ABC, abstractmethod from functools import cached_property @@ -449,7 +450,7 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: uris = [] for completed in self.openlineage_events_completes: for output in getattr(completed, source): - dataset_uri = output.namespace + "/" + output.name + dataset_uri = output.namespace + "/" + urllib.parse.quote(output.name) uris.append(dataset_uri) self.log.debug("URIs to be converted to Dataset: %s", uris) diff --git "a/dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" "b/dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" new file mode 100644 index 000000000..b017c8d0c --- /dev/null +++ "b/dev/dags/dbt/simple/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" @@ -0,0 +1,2 @@ +select + 'TEST_FOR_MULTIBYTE_CHARCTERS' diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 4b2535e07..068998de5 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use The ``RenderConfig`` class takes the following arguments: -- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies `_. +- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies `_. If a model in the project has a name containing multibyte characters, the dataset name will be URL-encoded. - ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior `_ section. - ``load_method``: how to load your dbt project. See `Parsing Methods `_ for more information. - ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding `_ for more information. diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index effd604fa..d54bbb5e1 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -489,6 +489,36 @@ def test_run_operator_dataset_emission_is_skipped(caplog): assert run_operator.outlets == [] +@pytest.mark.skipif( + version.parse(airflow_version) < version.parse("2.4") + or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", +) +@pytest.mark.integration +def test_run_operator_dataset_url_encoded_names(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=Path(__file__).parent.parent.parent / "dev/dags/dbt/simple", + task_id="run", + dbt_cmd_flags=["--models", "multibyte"], + install_deps=True, + append_env=True, + ) + run_operator + + run_test_dag(dag) + + assert run_operator.outlets == [ + Dataset( + uri="postgres://0.0.0.0:5432/postgres.public.%EF%BD%8D%EF%BD%95%EF%BD%8C%EF%BD%94%EF%BD%89%EF%BD%82%EF%BD%99%EF%BD%94%EF%BD%85", + extra=None, + ) + ] + + @pytest.mark.integration def test_run_operator_caches_partial_parsing(caplog, tmp_path): caplog.set_level(logging.DEBUG)