From a05ea7e3a917e3edc4d32825b35c948ee86c4573 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 11 Sep 2023 17:07:17 +0200 Subject: [PATCH 1/4] feat: kedro airflow convert --all option Signed-off-by: Simon Brugman --- kedro-airflow/README.md | 5 +- kedro-airflow/RELEASE.md | 9 ++ kedro-airflow/kedro_airflow/plugin.py | 137 ++++++++++++++------------ kedro-airflow/tests/test_plugin.py | 39 ++++++++ 4 files changed, 127 insertions(+), 63 deletions(-) diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index b61ed141d..9cc006bb3 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -32,10 +32,12 @@ kedro airflow create This command will generate an Airflow DAG file located in the `airflow_dags/` directory in your project. You can pass a `--pipeline` flag to generate the DAG file for a specific Kedro pipeline and an `--env` flag to generate the DAG file for a specific Kedro environment. +Passing `--all` will convert all registered Kedro pipelines to Airflow DAGs. ### Step 2: Copy the DAG file to the Airflow DAGs folder. For more information about the DAGs folder, please visit [Airflow documentation](https://airflow.apache.org/docs/stable/concepts.html#dags). +The Airflow DAG configuration can be customized by editing this file. ### Step 3: Package and install the Kedro pipeline in the Airflow executor's environment @@ -101,8 +103,9 @@ For instance, if you would like to use the name `scheduler`, then change the fil CONFIG_LOADER_ARGS = { "config_patterns": {"airflow": ["scheduler*", "scheduler/**"]} } +``` -Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader] +Follow Kedro's [official documentation](https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader), to see how to add templating, custom resolvers etc. #### What if I want to pass different arguments? diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index 0ea332f2b..7eca5fc3d 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,6 +1,15 @@ # Upcoming Release * Added support for Python 3.11 +# Release 0.6.1 +* Added the `--all` CLI argument to `kedro-airflow` to convert registered all pipelines at once. +* Simplify the output of the `kedro airflow create` command. + +## Community contributions +Many thanks to the following Kedroids for contributing PRs to this release: + +* [sbrugman](https://github.com/sbrugman) + # Release 0.6.0 * Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory. * Migrate all project metadata to static `pyproject.toml`. diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index 569e91be2..442f3ea3e 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -18,7 +18,10 @@ from slugify import slugify PIPELINE_ARG_HELP = """Name of the registered pipeline to convert. -If not set, the '__default__' pipeline is used.""" +If not set, the '__default__' pipeline is used. This argument supports +passing multiple values using `--pipeline [p1] --pipeline [p2]`. +Use the `--all` flag to convert all registered pipelines at once.""" +ALL_ARG_HELP = """Convert all registered pipelines at once.""" @click.group(name="Kedro-Airflow") @@ -32,7 +35,7 @@ def airflow_commands(): pass -def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]: +def _load_config(context: KedroContext) -> dict[str, Any]: # Set the default pattern for `airflow` if not provided in `settings.py` if "airflow" not in context.config_loader.config_patterns.keys(): context.config_loader.config_patterns.update( # pragma: no cover @@ -43,11 +46,13 @@ def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]: # Load the config try: - config_airflow = context.config_loader["airflow"] + return context.config_loader["airflow"] except MissingConfigException: # File does not exist return {} + +def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str): dag_config = {} # Load the default config if specified if "default" in config_airflow: @@ -55,13 +60,23 @@ def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]: # Update with pipeline-specific config if present if pipeline_name in config_airflow: dag_config.update(config_airflow[pipeline_name]) + + # Update with params if provided + dag_config.update(params) return dag_config @airflow_commands.command() @click.option( - "-p", "--pipeline", "pipeline_name", default="__default__", help=PIPELINE_ARG_HELP + "-p", + "--pipeline", + "--pipelines", + "pipeline_names", + multiple=True, + default=("__default__",), + help=PIPELINE_ARG_HELP, ) +@click.option("--all", "convert_all", is_flag=True, help=ALL_ARG_HELP) @click.option("-e", "--env", default="local", help=ENV_HELP) @click.option( "-t", @@ -90,21 +105,24 @@ def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]: @click.pass_obj def create( metadata: ProjectMetadata, - pipeline_name, + pipeline_names, env, target_path, jinja_file, params, + convert_all: bool, ): # pylint: disable=too-many-locals,too-many-arguments """Create an Airflow DAG for a project""" + if convert_all and pipeline_names != ("__default__",): + raise click.BadParameter( + "The `--all` and `--pipeline` option are mutually exclusive." + ) + project_path = Path.cwd().resolve() bootstrap_project(project_path) with KedroSession.create(project_path=project_path, env=env) as session: context = session.load_context() - dag_config = _load_config(context, pipeline_name) - - # Update with params if provided - dag_config.update(params) + config_airflow = _load_config(context) jinja_file = Path(jinja_file).resolve() loader = jinja2.FileSystemLoader(jinja_file.parent) @@ -112,57 +130,52 @@ def create( jinja_env.filters["slugify"] = slugify template = jinja_env.get_template(jinja_file.name) + dags_folder = Path(target_path) + # Ensure that the DAGs folder exists + dags_folder.mkdir(parents=True, exist_ok=True) + secho(f"Location of the Airflow DAG folder: {target_path!s}", fg="green") + package_name = metadata.package_name - dag_filename = ( - f"{package_name}_dag.py" - if pipeline_name == "__default__" - else f"{package_name}_{pipeline_name}_dag.py" - ) - - target_path = Path(target_path) - target_path = target_path / dag_filename - - target_path.parent.mkdir(parents=True, exist_ok=True) - - pipeline = pipelines.get(pipeline_name) - if pipeline is None: - raise KedroCliError(f"Pipeline {pipeline_name} not found.") - - dependencies = defaultdict(list) - for node, parent_nodes in pipeline.node_dependencies.items(): - for parent in parent_nodes: - dependencies[parent].append(node) - - template.stream( - dag_name=package_name, - dependencies=dependencies, - env=env, - pipeline_name=pipeline_name, - package_name=package_name, - pipeline=pipeline, - **dag_config, - ).dump(str(target_path)) - - secho("") - secho("An Airflow DAG has been generated in:", fg="green") - secho(str(target_path)) - secho("This file should be copied to your Airflow DAG folder.", fg="yellow") - secho( - "The Airflow configuration can be customized by editing this file.", - fg="green", - ) - secho("") - secho( - "This file also contains the path to the config directory, this directory will need to " - "be available to Airflow and any workers.", - fg="yellow", - ) - secho("") - secho( - "Additionally all data sets must have an entry in the data catalog.", - fg="yellow", - ) - secho( - "And all local paths in both the data catalog and log config must be absolute paths.", - fg="yellow", - ) + + if convert_all: + # Convert all pipelines + conversion_pipelines = pipelines + else: + conversion_pipelines = { + pipeline_name: pipelines.get(pipeline_name) + for pipeline_name in pipeline_names + } + + # Convert selected pipelines + for name, pipeline in conversion_pipelines.items(): + dag_config = _get_pipeline_config(config_airflow, params, name) + + if pipeline is None: + raise KedroCliError(f"Pipeline {name} not found.") + + # Obtain the file name + dag_filename = dags_folder / ( + f"{package_name}_dag.py" + if name == "__default__" + else f"{package_name}_{name}_dag.py" + ) + + dependencies = defaultdict(list) + for node, parent_nodes in pipeline.node_dependencies.items(): + for parent in parent_nodes: + dependencies[parent].append(node) + + template.stream( + dag_name=package_name, + dependencies=dependencies, + env=env, + pipeline_name=name, + package_name=package_name, + pipeline=pipeline, + **dag_config, + ).dump(str(dag_filename)) + + secho( + f"Converted pipeline `{name}` to Airflow DAG in the file `{dag_filename.name}`", + fg="green", + ) diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 4b67ff840..e997129ec 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -226,3 +226,42 @@ def test_create_airflow_dag_nonexistent_pipeline(cli_runner, metadata): "kedro.framework.cli.utils.KedroCliError: Pipeline de not found." in result.stdout ) + + +def test_create_airflow_all_dags(cli_runner, metadata): + command = ["airflow", "create", "--all"] + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + print(result.stdout) + + for dag_name, pipeline_name in [ + ("hello_world", "__default__"), + ("hello_world", "ds"), + ]: + dag_file = ( + Path.cwd() + / "airflow_dags" + / ( + f"{dag_name}_dag.py" + if pipeline_name == "__default__" + else f"{dag_name}_{pipeline_name}_dag.py" + ) + ) + assert dag_file.exists() + + expected_airflow_dag = 'tasks["node0"] >> tasks["node1"]' + with dag_file.open(encoding="utf-8") as f: + dag_code = [line.strip() for line in f.read().splitlines()] + assert expected_airflow_dag in dag_code + dag_file.unlink() + + +def test_create_airflow_all_and_pipeline(cli_runner, metadata): + command = ["airflow", "create", "--all", "-p", "ds"] + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 2 + assert ( + "Error: Invalid value: The `--all` and `--pipeline` option are mutually exclusive." + in result.stdout + ) From cccb2e0c3973ceea98a2b8217cbbed415d270299 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 9 Oct 2023 14:00:41 +0200 Subject: [PATCH 2/4] docs: release docs Signed-off-by: Simon Brugman --- kedro-airflow/RELEASE.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index 7eca5fc3d..32f705069 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,7 +1,5 @@ # Upcoming Release * Added support for Python 3.11 - -# Release 0.6.1 * Added the `--all` CLI argument to `kedro-airflow` to convert registered all pipelines at once. * Simplify the output of the `kedro airflow create` command. From e3dec7c4b2683e8641a6c9926681afa94be23e4e Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 9 Oct 2023 14:27:40 +0200 Subject: [PATCH 3/4] fix: backwards compatibility Signed-off-by: Simon Brugman --- kedro-airflow/RELEASE.md | 1 + kedro-airflow/kedro_airflow/plugin.py | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index 32f705069..e7ab78695 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -2,6 +2,7 @@ * Added support for Python 3.11 * Added the `--all` CLI argument to `kedro-airflow` to convert registered all pipelines at once. * Simplify the output of the `kedro airflow create` command. +* Fixed compatibility of `kedro-airflow` with older versions of the config loaders (`kedro<=0.18.2`). ## Community contributions Many thanks to the following Kedroids for contributing PRs to this release: diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index ba998dabc..d05cb4cea 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -36,17 +36,22 @@ def airflow_commands(): def _load_config(context: KedroContext) -> dict[str, Any]: + # Backwards compatibility for ConfigLoader that does not support `config_patterns` + config_loader = context.config_loader + if not hasattr(config_loader, "config_patterns"): + return config_loader.get("airflow*", "airflow/**") + # Set the default pattern for `airflow` if not provided in `settings.py` - if "airflow" not in context.config_loader.config_patterns.keys(): - context.config_loader.config_patterns.update( # pragma: no cover + if "airflow" not in config_loader.config_patterns.keys(): + config_loader.config_patterns.update( # pragma: no cover {"airflow": ["airflow*", "airflow/**"]} ) - assert "airflow" in context.config_loader.config_patterns.keys() + assert "airflow" in config_loader.config_patterns.keys() # Load the config try: - return context.config_loader["airflow"] + return config_loader["airflow"] except MissingConfigException: # File does not exist return {} From d7255d6c327e89ce9e8268124726dceebabcd5a7 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 9 Oct 2023 14:50:05 +0200 Subject: [PATCH 4/4] test: add test for backwards compatibility Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/plugin.py | 2 +- kedro-airflow/tests/test_plugin.py | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index d05cb4cea..cb20a9d38 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -40,7 +40,7 @@ def _load_config(context: KedroContext) -> dict[str, Any]: config_loader = context.config_loader if not hasattr(config_loader, "config_patterns"): return config_loader.get("airflow*", "airflow/**") - + # Set the default pattern for `airflow` if not provided in `settings.py` if "airflow" not in config_loader.config_patterns.keys(): config_loader.config_patterns.update( # pragma: no cover diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 1d282f0c3..4c11efd22 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -5,8 +5,11 @@ import pytest import yaml +from kedro.config import ConfigLoader +from kedro.framework.context import KedroContext +from pluggy import PluginManager -from kedro_airflow.plugin import commands +from kedro_airflow.plugin import _load_config, commands @pytest.mark.parametrize( @@ -264,3 +267,22 @@ def test_create_airflow_all_and_pipeline(cli_runner, metadata): "Error: Invalid value: The `--all` and `--pipeline` option are mutually exclusive." in result.stdout ) + + +def test_config_loader_backwards_compatibility(cli_runner, metadata): + # Emulate ConfigLoader in kedro <= 0.18.2 + conf_source = Path.cwd() / "conf" + config_loader = ConfigLoader(conf_source=conf_source) + del config_loader.config_patterns + context = KedroContext( + config_loader=config_loader, + hook_manager=PluginManager(project_name=metadata.project_name), + package_name=metadata.package_name, + project_path=metadata.project_path, + ) + + config = _load_config(context) + assert config == { + "default": {"owner": "again someone else"}, + "ds": {"owner": "finally someone else"}, + }