From 2192d3c75938b6db9161c3f503be3d49229d48f5 Mon Sep 17 00:00:00 2001 From: Michael Schuster Date: Thu, 27 Jan 2022 13:39:24 +0100 Subject: [PATCH 1/3] Fix airflow example --- examples/airflow_local/setup.sh | 26 ++++++++++++++----- .../orchestrators/airflow_orchestrator.py | 12 ++++----- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/examples/airflow_local/setup.sh b/examples/airflow_local/setup.sh index 3e1c37f9ffc..5ccd1c66200 100644 --- a/examples/airflow_local/setup.sh +++ b/examples/airflow_local/setup.sh @@ -2,14 +2,28 @@ set -Eeo pipefail +setup_stack () { + zenml orchestrator register airflow_orchestrator --type=airflow || \ + msg "${WARNING}Reusing preexisting orchestrator ${NOFORMAT}airflow_orchestrator" + zenml stack register local_airflow_stack \ + -m local_metadata_store \ + -a local_artifact_store \ + -o airflow_orchestrator \ + msg "${WARNING}Reusing preexisting stack ${NOFORMAT}local_airflow_stack" + + zenml stack set local_airflow_stack + + zenml stack up +} + pre_run () { - zenml integration install airflow - zenml integration install sklearn - zenml integration install tensorflow + zenml integration install airflow sklearn + + setup_stack } pre_run_forced () { - zenml integration install airflow -f - zenml integration install sklearn -f - zenml integration install tensorflow -f + zenml integration install airflow sklearn -f + + setup_stack } \ No newline at end of file diff --git a/src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py b/src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py index a5ff100cbd9..14cbcd26c8c 100644 --- a/src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py +++ b/src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py @@ -15,7 +15,7 @@ import datetime import os import time -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict from pydantic import root_validator @@ -29,6 +29,7 @@ from zenml.logger import get_logger from zenml.orchestrators import BaseOrchestrator from zenml.orchestrators.utils import create_tfx_pipeline +from zenml.repository import Repository from zenml.stack.stack_component_class_registry import ( register_stack_component_class, ) @@ -53,7 +54,6 @@ class AirflowOrchestrator(BaseOrchestrator): """Orchestrator responsible for running pipelines using Airflow.""" airflow_home: str = "" - airflow_config: Optional[Dict[str, Any]] = {} schedule_interval_minutes: int = 1 supports_local_execution = True supports_remote_execution = False @@ -244,7 +244,7 @@ def provision(self) -> None: command.run, pid_file=self.pid_file, log_file=self.log_file, - working_directory=zenml.io.utils.get_zenml_dir(), + working_directory=str(Repository().root), ) while not self.is_running: # Wait until the daemon started all the relevant airflow @@ -258,7 +258,7 @@ def provision(self) -> None: "want to start it manually, use the commands described in the " "official Airflow quickstart guide for running Airflow locally." ) - self.down() + self.deprovision() def deprovision(self) -> None: """Stops the airflow daemon if necessary and tears down resources.""" @@ -279,7 +279,7 @@ def run_pipeline( Returns: An Airflow DAG object that corresponds to the ZenML pipeline. """ - self.airflow_config = { + airflow_config = { "schedule_interval": datetime.timedelta( minutes=self.schedule_interval_minutes ), @@ -287,6 +287,6 @@ def run_pipeline( "start_date": datetime.datetime(2019, 1, 1), } - runner = AirflowDagRunner(AirflowPipelineConfig(self.airflow_config)) + runner = AirflowDagRunner(AirflowPipelineConfig(airflow_config)) tfx_pipeline = create_tfx_pipeline(pipeline, stack=stack) return runner.run(tfx_pipeline, run_name=runtime_configuration.run_name) From 672d982511535e6cf2a40c53305b81afdcf8d3bc Mon Sep 17 00:00:00 2001 From: Michael Schuster Date: Thu, 27 Jan 2022 13:49:23 +0100 Subject: [PATCH 2/3] Fix example setup shell script --- examples/airflow_local/setup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/airflow_local/setup.sh b/examples/airflow_local/setup.sh index 5ccd1c66200..59c9a7737d1 100644 --- a/examples/airflow_local/setup.sh +++ b/examples/airflow_local/setup.sh @@ -8,7 +8,7 @@ setup_stack () { zenml stack register local_airflow_stack \ -m local_metadata_store \ -a local_artifact_store \ - -o airflow_orchestrator \ + -o airflow_orchestrator || \ msg "${WARNING}Reusing preexisting stack ${NOFORMAT}local_airflow_stack" zenml stack set local_airflow_stack From 2004d2e71432f4aed3ab0103d524ab26b6127722 Mon Sep 17 00:00:00 2001 From: Michael Schuster Date: Fri, 28 Jan 2022 11:55:14 +0100 Subject: [PATCH 3/3] [ci skip] Update airflow example readme --- examples/airflow_local/README.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/airflow_local/README.md b/examples/airflow_local/README.md index ea0fceb6754..831418898e0 100644 --- a/examples/airflow_local/README.md +++ b/examples/airflow_local/README.md @@ -16,9 +16,7 @@ In order to run this example, you need to install and initialize ZenML and Airfl pip install zenml # install ZenML integrations -zenml integration install airflow -zenml integration install tensorflow -zenml integration install sklearn +zenml integration install airflow sklearn # pull example zenml example pull airflow_local @@ -43,7 +41,7 @@ zenml stack set airflow_stack ZenML takes care of configuring Airflow, all we need to do is run: ```bash -zenml orchestrator up +zenml stack up ``` This will bootstrap Airflow, start up all the necessary components and run them in the background. @@ -69,10 +67,10 @@ python run.py After a few seconds, you should be able to see the executed dag [here](http://0.0.0.0:8080/tree?dag_id=mnist_pipeline) ### Clean up -In order to clean up, shut down the airflow orchestrator and delete the remaining zenml references. +In order to clean up, tear down the airflow stack and delete the remaining zenml references. ```shell -zenml orchestrator down +zenml stack down --force rm -rf zenml_examples ``` ## SuperQuick `airflow_local` run