Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix local airflow example #366

Merged
merged 3 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions examples/airflow_local/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ In order to run this example, you need to install and initialize ZenML and Airfl
pip install zenml

# install ZenML integrations
zenml integration install airflow
zenml integration install tensorflow
zenml integration install sklearn
zenml integration install airflow sklearn

# pull example
zenml example pull airflow_local
Expand All @@ -43,7 +41,7 @@ zenml stack set airflow_stack
ZenML takes care of configuring Airflow, all we need to do is run:

```bash
zenml orchestrator up
zenml stack up
```

This will bootstrap Airflow, start up all the necessary components and run them in the background.
Expand All @@ -69,10 +67,10 @@ python run.py
After a few seconds, you should be able to see the executed dag [here](http://0.0.0.0:8080/tree?dag_id=mnist_pipeline)

### Clean up
In order to clean up, shut down the airflow orchestrator and delete the remaining zenml references.
In order to clean up, tear down the airflow stack and delete the remaining zenml references.

```shell
zenml orchestrator down
zenml stack down --force
rm -rf zenml_examples
```
## SuperQuick `airflow_local` run
Expand Down
26 changes: 20 additions & 6 deletions examples/airflow_local/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

set -Eeo pipefail

setup_stack () {
zenml orchestrator register airflow_orchestrator --type=airflow || \
msg "${WARNING}Reusing preexisting orchestrator ${NOFORMAT}airflow_orchestrator"
zenml stack register local_airflow_stack \
-m local_metadata_store \
-a local_artifact_store \
-o airflow_orchestrator || \
msg "${WARNING}Reusing preexisting stack ${NOFORMAT}local_airflow_stack"

zenml stack set local_airflow_stack

zenml stack up
}

pre_run () {
zenml integration install airflow
zenml integration install sklearn
zenml integration install tensorflow
zenml integration install airflow sklearn

setup_stack
}

pre_run_forced () {
zenml integration install airflow -f
zenml integration install sklearn -f
zenml integration install tensorflow -f
zenml integration install airflow sklearn -f

setup_stack
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import datetime
import os
import time
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict

from pydantic import root_validator

Expand All @@ -29,6 +29,7 @@
from zenml.logger import get_logger
from zenml.orchestrators import BaseOrchestrator
from zenml.orchestrators.utils import create_tfx_pipeline
from zenml.repository import Repository
from zenml.stack.stack_component_class_registry import (
register_stack_component_class,
)
Expand All @@ -53,7 +54,6 @@ class AirflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Airflow."""

airflow_home: str = ""
airflow_config: Optional[Dict[str, Any]] = {}
schedule_interval_minutes: int = 1
supports_local_execution = True
supports_remote_execution = False
Expand Down Expand Up @@ -244,7 +244,7 @@ def provision(self) -> None:
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=zenml.io.utils.get_zenml_dir(),
working_directory=str(Repository().root),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
Expand All @@ -258,7 +258,7 @@ def provision(self) -> None:
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.down()
self.deprovision()

def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
Expand All @@ -279,14 +279,14 @@ def run_pipeline(
Returns:
An Airflow DAG object that corresponds to the ZenML pipeline.
"""
self.airflow_config = {
airflow_config = {
"schedule_interval": datetime.timedelta(
minutes=self.schedule_interval_minutes
),
# We set this in the past and turn catchup off and then it works
"start_date": datetime.datetime(2019, 1, 1),
}

runner = AirflowDagRunner(AirflowPipelineConfig(self.airflow_config))
runner = AirflowDagRunner(AirflowPipelineConfig(airflow_config))
tfx_pipeline = create_tfx_pipeline(pipeline, stack=stack)
return runner.run(tfx_pipeline, run_name=runtime_configuration.run_name)