diff --git a/docs/book/features/integrations.md b/docs/book/features/integrations.md index f5942daf138..de3e1ad51b3 100644 --- a/docs/book/features/integrations.md +++ b/docs/book/features/integrations.md @@ -19,32 +19,32 @@ ML workflows, from [`scikit-learn`](https://scikit-learn.org/stable/) to [`PyTor These are the third-party integrations that ZenML currently supports: -| Integration | Status | Type | Implementation Notes | Example | -| ----------- | ------ | ---- | -------------------- | ------- | -| Apache Airflow | ✅ | Orchestrator | Works for local environment | [airflow_local](https://github.com/zenml-io/zenml/tree/main/examples/airflow_local) | -| Apache Beam | ✅ | Distributed Processing | | | -| AWS | ✅ | Cloud | Use S3 buckets as ZenML artifact stores | | -| Azure | ✅ | Cloud | Use Azure Blob Storage buckets as ZenML artifact stores | | -| BentoML | ⛏ | Cloud | Looking for community implementors | | -| Dash | ✅ | Visualizer | For Pipeline and PipelineRun visualization objects. | [lineage](https://github.com/zenml-io/zenml/tree/main/examples/lineage) | -| Evidently | ✅ | Monitoring | Allows for visualization of drift as well as export of a `Profile` object | [drift_detection](https://github.com/zenml-io/zenml/tree/release/0.5.7/examples/drift_detection) | -| Facets | ✅ | Visualizer | | [statistics](https://github.com/zenml-io/zenml/tree/main/examples/statistics) | -| GCP | ✅ | Cloud | Use Google Cloud Storage buckets as ZenML artifact stores | | -| Graphviz | ✅ | Visualizer | For Pipeline and PipelineRun visualization objects. | [dag_visualizer](https://github.com/zenml-io/zenml/tree/main/examples/dag_visualizer) | -| Great Expectations | ⛏ | Data Validation | Looking for community implementors | | -| KServe | ⛏ | Inference | Looking for community implementors | | -| Kubeflow | ✅ | Orchestrator | Either full Kubeflow or Kubeflow Pipelines. Works for local environments currently. | [kubeflow](https://github.com/zenml-io/zenml/tree/main/examples/kubeflow) | -| MLflow Tracking | ✅ | Experiment Tracking | Tracks your pipelines and your training runs. | [mlflow](https://github.com/zenml-io/zenml/tree/main/examples/mlflow_tracking) | -| MLflow Deployment | ✅ | Deployment | Deploys models with the MLflow scoring server. | [mlflow](https://github.com/zenml-io/zenml/tree/main/examples/mlflow_deployment) | -| numpy | ✅ | Exploration | | | -| pandas | ✅ | Exploration | | | -| Plotly | ✅ | Visualizer | For Pipeline and PipelineRun visualization objects. | [lineage](https://github.com/zenml-io/zenml/tree/main/examples/lineage) | -| PyTorch | ✅ | Training | | | -| PyTorch Lightning | ✅ | Training | | | -| scikit-learn | ✅ | Training | | [caching chapter](https://docs.zenml.io/v/docs/guides/functional-api/caching) | -| Seldon | ⛏ | Cloud | Looking for community implementors | | -| Tensorflow | ✅ | Training | | [quickstart](https://github.com/zenml-io/zenml/tree/main/examples/quickstart) | -| whylogs | ✅ | Logging | Integration fully implemented for data logging | [whylogs](https://github.com/zenml-io/zenml/tree/main/examples/whylogs) | +| Integration | Status | Type | Implementation Notes | Example | +| ----------- | ------ |------------------------|---------------------|--------------------------------------------------------------------------------------------------| +| Apache Airflow | ✅ | Orchestrator | Works for local environment | [airflow_local](https://github.com/zenml-io/zenml/tree/main/examples/airflow_local) | +| Apache Beam | ✅ | Distributed Processing | | | +| AWS | ✅ | Cloud | Use S3 buckets as ZenML artifact stores | | +| Azure | ✅ | Cloud | Use Azure Blob Storage buckets as ZenML artifact stores | | +| BentoML | ⛏ | Cloud | Looking for community implementors | | +| Dash | ✅ | Visualizer | For Pipeline and PipelineRun visualization objects. | [lineage](https://github.com/zenml-io/zenml/tree/main/examples/lineage) | +| Evidently | ✅ | Monitoring | Allows for visualization of drift as well as export of a `Profile` object | [drift_detection](https://github.com/zenml-io/zenml/tree/release/0.5.7/examples/drift_detection) | +| Facets | ✅ | Visualizer | | [statistics](https://github.com/zenml-io/zenml/tree/main/examples/statistics) | +| GCP | ✅ | Cloud | Use Google Cloud Storage buckets as ZenML artifact stores | | +| Graphviz | ✅ | Visualizer | For Pipeline and PipelineRun visualization objects. | [dag_visualizer](https://github.com/zenml-io/zenml/tree/main/examples/dag_visualizer) | +| Great Expectations | ⛏ | Data Validation | Looking for community implementors | | +| KServe | ⛏ | Inference | Looking for community implementors | | +| Kubeflow | ✅ | Orchestrator | Either full Kubeflow or Kubeflow Pipelines. Works for local environments currently. | [kubeflow](https://github.com/zenml-io/zenml/tree/main/examples/kubeflow) | +| MLflow Tracking | ✅ | Experiment Tracking | Tracks your pipelines and your training runs. | [mlflow](https://github.com/zenml-io/zenml/tree/main/examples/mlflow_tracking) | +| MLflow Deployment | ✅ | Deployment | Deploys models with the MLflow scoring server. | [mlflow](https://github.com/zenml-io/zenml/tree/main/examples/mlflow_deployment) | +| numpy | ✅ | Exploration | | | +| pandas | ✅ | Exploration | | | +| Plotly | ✅ | Visualizer | For Pipeline and PipelineRun visualization objects. | [lineage](https://github.com/zenml-io/zenml/tree/main/examples/lineage) | +| PyTorch | ✅ | Training | | | +| PyTorch Lightning | ✅ | Training | | | +| scikit-learn | ✅ | Training | | [caching chapter](https://docs.zenml.io/v/docs/guides/functional-api/caching) | +| Seldon | ⛏ | Cloud | Looking for community implementors | | +| Tensorflow | ✅ | Training, Visualizer | Tensorboard support | [quickstart](https://github.com/zenml-io/zenml/tree/main/examples/quickstart). [kubeflow](https://github.com/zenml-io/zenml/tree/main/examples/kubeflow) | +| whylogs | ✅ | Logging | Integration fully implemented for data logging | [whylogs](https://github.com/zenml-io/zenml/tree/main/examples/whylogs) | ✅ means the integration is already implemented. ⛏ means we are looking to implement the integration soon. diff --git a/docs/book/features/services.md b/docs/book/features/services.md new file mode 100644 index 00000000000..b1342784e0b --- /dev/null +++ b/docs/book/features/services.md @@ -0,0 +1,52 @@ +--- +description: ZenML pipelines are fully-aware of external, longer-lived services. +--- + +# Interact with external services + +ZenML interacts with external systems (e.g. prediction services, monitoring systems, visualization services) with a so-called `Service` abstraction. +The concrete implementation of this abstraction deals with functionality concerning the life-cycle management and tracking of an external service (e.g. process, container, +Kubernetes deployment etc.). + +One concrete example of a `Service` is the built-in `LocalDaemonService`, a service represented by a local daemon +process. This extends the base service class with functionality concerning the life-cycle management and tracking +of external services implemented as local daemon processes. + +Services can be passed through steps like any other object, and used to interact with the external systems that +they represent: + +```python +@step +def my_step(my_service: MyService) -> ...: + if not my_service.is_running: + my_service.start() # starts service + my_service.stop() # stops service +``` + +You can see full examples of using services here: + +* Visualizing training with tensorboard with the [Kubeflow tensorboard example](https://github.com/zenml-io/zenml/tree/main/examples/kubeflow). +* Continuous training and continuous deployment setting with the [MLflow deployment example](https://github.com/zenml-io/zenml/tree/main/examples/mlflow_deployment). + +## Examples services + +One concrete example of a `Service` implementation is the `TensorboardService`. +It enables visualizing [Tensorboard](https://www.tensorflow.org/tensorboard) logs easily by managing a local Tensorboard server. + +```python +service = TensorboardService( + TensorboardServiceConfig( + logdir=logdir, + ) +) + +# start the service +service.start(timeout=20) + +# stop the service +service.stop() +``` + +This couples nicely with the `TensorboardVisualizer` to visualize Tensorboard logs. + +Another example of a service can be found in the [continuous training and deployment guide](continous-training-and-deployment.md). \ No newline at end of file diff --git a/docs/book/toc.md b/docs/book/toc.md index 8b3a61f026a..404c143d8c8 100644 --- a/docs/book/toc.md +++ b/docs/book/toc.md @@ -23,9 +23,8 @@ * [Caching](guides/class-based-api/caching.md) * [Materialize artifacts](guides/class-based-api/materialize-artifacts.md) * [General](guides/common-usecases/index.md) - * [Post Execution Workflow](guides/common-usecases/post-execution-workflow.md) - * [Naming a pipeline run](guides/common-usecases/naming-a-pipeline-run.md) * [Fetch pipelines and artifacts after running them](guides/common-usecases/post-execution-workflow.md) + * [Naming a pipeline run](guides/common-usecases/naming-a-pipeline-run.md) * [Use `zenml example run`](guides/common-usecases/zenml-example-cli.md) * [Creating a custom `materializer`](guides/common-usecases/custom-materializer.md) * [Fetching historic runs using `StepContext`](guides/common-usecases/historic-runs.md) @@ -33,14 +32,15 @@ ## Features +* [Integrations](features/integrations.md) * [Pipeline Configuration](features/pipeline-configuration.md) * [Caching](features/caching.md) -* [Integrations](features/integrations.md) * [Standardized Artifacts](features/artifacts.md) * [Run Your Pipelines in the Cloud](features/cloud-pipelines/cloud-pipelines.md) * [Cloud-specific guide](features/cloud-pipelines/guide-aws-gcp-azure.md) -* [Step Fixtures](features/step-fixtures.md) +* [Convenient Step Fixtures](features/step-fixtures.md) * [Continuous Training and Deployment](features/continous-training-and-deployment.md) +* [Managing External Services](features/services.md) ## Support diff --git a/examples/kubeflow/README.md b/examples/kubeflow/README.md index 6a17a4dee23..fa124061468 100644 --- a/examples/kubeflow/README.md +++ b/examples/kubeflow/README.md @@ -1,35 +1,41 @@ # Deploy pipelines to production using Kubeflow Pipelines -When developing ML models, you probably develop your pipelines on your local machine initially as this allows for -quicker iteration and debugging. However, at a certain point when you are finished with its design, you might want to -transition to a more production-ready setting and deploy the pipeline to a more robust environment. +When developing ML models, you probably develop your pipelines on your local +machine initially as this allows for quicker iteration and debugging. However, +at a certain point when you are finished with its design, you might want to +transition to a more production-ready setting and deploy the pipeline to a more +robust environment. You can also watch a video of this example [here](https://www.youtube.com/watch?v=b5TXRYkdL3w). ## Pre-requisites -In order to run this example, we have to install a few tools that allow ZenML to spin up a local Kubeflow Pipelines +In order to run this example, we have to install a few tools that allow ZenML to +spin up a local Kubeflow Pipelines setup: -* [K3D](https://k3d.io/v5.2.1/#installation) to spin up a local Kubernetes cluster -* The Kubernetes command-line tool [Kubectl](https://kubernetes.io/docs/tasks/tools/#kubectl) to deploy -Kubeflow Pipelines -* [Docker](https://docs.docker.com/get-docker/) to build docker images that run your pipeline in Kubernetes -pods (**Note**: the local Kubeflow Pipelines deployment requires more than 2 GB of RAM, so if you're using -Docker Desktop make sure to update the resource limits in the preferences) +* [K3D](https://k3d.io/v5.2.1/#installation) to spin up a local Kubernetes +cluster +* The Kubernetes command-line tool [Kubectl](https://kubernetes.io/docs/tasks/tools/#kubectl) +to deploy Kubeflow Pipelines +* [Docker](https://docs.docker.com/get-docker/) to build docker images that run +your pipeline in Kubernetes pods (**Note**: the local Kubeflow Pipelines +deployment requires more than 2 GB of RAM, so if you're using Docker Desktop +make sure to update the resource limits in the preferences) ## Installation -Next, we will install ZenML, get the code for this example and initialize a ZenML repository: +Next, we will install ZenML, get the code for this example and initialize a +ZenML repository: ```bash # Install python dependencies pip install zenml +pip install notebook # if you want to run the example on the notebook # Install ZenML integrations -zenml integration install kubeflow -zenml integration install sklearn +zenml integration install kubeflow tensorflow # Pull the kubeflow example zenml example pull kubeflow @@ -39,17 +45,66 @@ cd zenml_examples/kubeflow zenml init ``` -## Run on a local Kubeflow Pipelines deployment +## Use the notebook +As an alternate to running the below commands, you can also simply use the notebook version and see the story unfold there: + +```shell +jupyter notebook +``` + +Otherwise, please continue reading if you want to run it straight in Python scripts. + +## Run on the local machine + +### Run the pipeline + +We can now run the pipeline by simply executing the python script: + +```bash +python run.py +``` + +The script will run the pipeline locally and will start a Tensorboard +server that can be accessed to visualize the information for the trained model. + +Re-running the example with different hyperparameter values will re-train +the model and the Tensorboard server will be updated automatically to include +the new model information, e.g.: + +```shell +python run.py --lr=0.02 +python run.py --epochs=10 +``` + +![Tensorboard 01](assets/tensorboard-01.png) +![Tensorboard 02](assets/tensorboard-02.png) +![Tensorboard 03](assets/tensorboard-03.png) + +### Clean up + +Once you're done experimenting, you can stop the Tensorboard server running +in the background by running the command below. However, you may want to keep +it running if you want to continue on to the next step and run the same +pipeline on a local Kubeflow Pipelines deployment. + +```bash +python run.py --stop-tensorboard +``` + +## Run the same pipeline on a local Kubeflow Pipelines deployment ### Create a local Kubeflow Pipelines Stack -Now with all the installation and initialization out of the way, all that's left to do is configuring our -ZenML [stack](https://docs.zenml.io/core-concepts). For this example, the stack we create consists of the -following four parts: +Now with all the installation and initialization out of the way, all that's left +to do is configuring our ZenML [stack](https://docs.zenml.io/core-concepts). For +this example, the stack we create consists of the following four parts: * The **local artifact store** stores step outputs on your hard disk. -* The **local metadata store** stores metadata like the pipeline name and step parameters inside a local SQLite database. -* The docker images that are created to run your pipeline are stored in a local docker **container registry**. -* The **Kubeflow orchestrator** is responsible for running your ZenML pipeline in Kubeflow Pipelines. +* The **local metadata store** stores metadata like the pipeline name and step +parameters inside a local SQLite database. +* The docker images that are created to run your pipeline are stored in a local +docker **container registry**. +* The **Kubeflow orchestrator** is responsible for running your ZenML pipeline +in Kubeflow Pipelines. ```bash # Make sure to create the local registry on port 5000 for it to work @@ -66,13 +121,16 @@ zenml stack set local_kubeflow_stack ``` ### Start up Kubeflow Pipelines locally -ZenML takes care of setting up and configuring the local Kubeflow Pipelines deployment. All we need to do is run: + +ZenML takes care of setting up and configuring the local Kubeflow Pipelines +deployment. All we need to do is run: + ```bash zenml stack up ``` -When the setup is finished, you should see a local URL which you can access in your browser and take a look at the -Kubeflow Pipelines UI. +When the setup is finished, you should see a local URL which you can access in +your browser and take a look at the Kubeflow Pipelines UI. ### Run the pipeline We can now run the pipeline by simply executing the python script: @@ -81,12 +139,41 @@ We can now run the pipeline by simply executing the python script: python run.py ``` -This will build a docker image containing all the necessary python packages and files, push it to the local container -registry and schedule a pipeline run in Kubeflow Pipelines. -Once the script is finished, you should be able to see the pipeline run [here](http://localhost:8080/#/runs). +This will build a docker image containing all the necessary python packages and +files, push it to the local container registry and schedule a pipeline run in +Kubeflow Pipelines. Once the script is finished, you should be able to see the +pipeline run [here](http://localhost:8080/#/runs). + +The Tensorboard logs for the model trained in every pipeline run can be viewed +directly in the Kubeflow Pipelines UI by clicking on the "Visualization" tab +and then clicking on the "Open Tensorboard" button. + +![Tensorboard Kubeflow Visualization](assets/tensorboard-kubeflow-vis.png) +![Tensorboard Kubeflow UI](assets/tensorboard-kubeflow-ui.png) + +At the same time, the script will start a local Tensorboard server that can be +accessed to visualize the information for all past and future versions of the +trained model. + +Re-running the example with different hyperparameter values will re-train +the model and the Tensorboard server will be updated automatically to include +the new model information, e.g.: + +```shell +python run.py --learning_rate=0.02 +python run.py --epochs=10 +``` ### Clean up -Once you're done experimenting, you can delete the local Kubernetes cluster and all associated resources by calling: +Once you're done experimenting, you can stop the Tensorboard server running +in the background with the command: + +```bash +python run.py --stop-tensorboard +``` + +You can delete the local Kubernetes cluster and all associated resources by +calling: ```bash zenml stack down diff --git a/examples/kubeflow/ZenML Walkthrough Kubeflow Pipelines.ipynb b/examples/kubeflow/ZenML Walkthrough Kubeflow Pipelines.ipynb index d3e22963b92..ba35d4fd2da 100644 --- a/examples/kubeflow/ZenML Walkthrough Kubeflow Pipelines.ipynb +++ b/examples/kubeflow/ZenML Walkthrough Kubeflow Pipelines.ipynb @@ -43,8 +43,7 @@ "source": [ "# Install the ZenML CLI tool and Tensorflow\n", "!pip install zenml\n", - "!zenml integration install kubeflow -f\n", - "!zenml integration install sklearn -f" + "!zenml integration install kubeflow tensorflow" ] }, { @@ -85,7 +84,7 @@ "id": "yQE8PSXDzL-_" }, "source": [ - "The above commands have automatically created a local MLOps stack for you and set it to active:" + "The above commands have automatically created a local MLOps stack for you and set it to active. Let's make sure:" ] }, { @@ -94,7 +93,7 @@ "metadata": {}, "outputs": [], "source": [ - "!zenml stack get" + "!zenml stack set local_stack" ] }, { @@ -130,15 +129,15 @@ }, "outputs": [], "source": [ - "import os\n", "import logging\n", + "import os\n", + "\n", "import numpy as np\n", - "from sklearn.base import ClassifierMixin\n", + "import tensorflow as tf\n", "\n", - "from zenml.integrations.sklearn.helpers.digits import get_digits, get_digits_model\n", + "from zenml.integrations.constants import TENSORFLOW\n", "from zenml.pipelines import pipeline\n", - "from zenml.steps import step\n", - "from zenml.steps.step_output import Output" + "from zenml.steps import BaseStepConfig, Output, StepContext, step" ] }, { @@ -178,10 +177,16 @@ "source": [ "@step\n", "def importer() -> Output(\n", - " X_train=np.ndarray, X_test=np.ndarray, y_train=np.ndarray, y_test=np.ndarray\n", + " X_train=np.ndarray,\n", + " X_test=np.ndarray,\n", + " y_train=np.ndarray,\n", + " y_test=np.ndarray,\n", "):\n", - " \"\"\"Loads the digits array as normal numpy arrays.\"\"\"\n", - " X_train, X_test, y_train, y_test = get_digits()\n", + " \"\"\"Download the MNIST data store it as an artifact\"\"\"\n", + " (X_train, y_train), (\n", + " X_test,\n", + " y_test,\n", + " ) = tf.keras.datasets.mnist.load_data()\n", " return X_train, X_test, y_train, y_test" ] }, @@ -216,7 +221,7 @@ "id": "ma53mucU0yF3" }, "source": [ - "We then add a `trainer` step, that takes the normalized data and trains a sklearn model on the data." + "We then add a `trainer` step, that takes the normalized data and trains a Keras model on the data. The step has an associated `TrainerConfig` step configuration class. Also note how we use the `StepContext` to extract the Artifact Store path alongside the output model Artifact where Tensorboard logs are to be stored." ] }, { @@ -227,14 +232,47 @@ }, "outputs": [], "source": [ - "@step(enable_cache=False)\n", + "class TrainerConfig(BaseStepConfig):\n", + " \"\"\"Trainer params\"\"\"\n", + "\n", + " epochs: int = 1\n", + " lr: float = 0.001\n", + "\n", + "@step\n", "def trainer(\n", " X_train: np.ndarray,\n", " y_train: np.ndarray,\n", - ") -> ClassifierMixin:\n", - " \"\"\"Train a simple sklearn classifier for the digits dataset.\"\"\"\n", - " model = get_digits_model()\n", - " model.fit(X_train, y_train)\n", + " context: StepContext,\n", + " config: TrainerConfig,\n", + ") -> tf.keras.Model:\n", + " \"\"\"Train a neural net from scratch to recognize MNIST digits return our\n", + " model or the learner\"\"\"\n", + " model = tf.keras.Sequential(\n", + " [\n", + " tf.keras.layers.Flatten(input_shape=(28, 28)),\n", + " tf.keras.layers.Dense(10, activation=\"relu\"),\n", + " tf.keras.layers.Dense(10),\n", + " ]\n", + " )\n", + "\n", + " log_dir = os.path.join(context.get_output_artifact_uri(), \"logs\")\n", + " tensorboard_callback = tf.keras.callbacks.TensorBoard(\n", + " log_dir=log_dir, histogram_freq=1\n", + " )\n", + "\n", + " model.compile(\n", + " optimizer=tf.keras.optimizers.Adam(config.lr),\n", + " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", + " metrics=[\"accuracy\"],\n", + " )\n", + "\n", + " model.fit(\n", + " X_train,\n", + " y_train,\n", + " epochs=config.epochs,\n", + " callbacks=[tensorboard_callback],\n", + " )\n", + "\n", " return model" ] }, @@ -255,10 +293,11 @@ "def evaluator(\n", " X_test: np.ndarray,\n", " y_test: np.ndarray,\n", - " model: ClassifierMixin,\n", + " model: tf.keras.Model,\n", ") -> float:\n", " \"\"\"Calculate the accuracy on the test set\"\"\"\n", - " test_acc = model.score(X_test, y_test)\n", + "\n", + " _, test_acc = model.evaluate(X_test, y_test, verbose=2)\n", " logging.info(f\"Test accuracy: {test_acc}\")\n", " return test_acc" ] @@ -329,7 +368,8 @@ "base_uri": "https://localhost:8080/" }, "id": "dRzZA406UVVz", - "outputId": "f61e4408-4001-4cc7-ed7d-8472b1c4089f" + "outputId": "f61e4408-4001-4cc7-ed7d-8472b1c4089f", + "scrolled": false }, "outputs": [], "source": [ @@ -347,56 +387,97 @@ { "cell_type": "markdown", "metadata": { - "id": "gotkJdTQz8j2" + "id": "b-JtDHu_z1IX" }, "source": [ - "# Transitioning to Kubeflow Pipelines" + "## Visualize the model with Tensorboard" ] }, { "cell_type": "markdown", "metadata": { - "id": "PMLU4cNW-Ei4" + "id": "NrJA5OSgnydC" }, "source": [ - "We got pretty good results on the digits model that we trained, but at some point we want to get out of this notebook local stack and go to a stack which looks more like production. Here is where the ZenML [Kubeflow Pipelines](https://github.com/kubeflow/pipelines) integration helps!" + "To visualize the model with Tensorboard, make use of the built-in ZenML Tensorboard visualizer, that will automatically start a Tensorboard server in the background." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "dRzZA406UVVz", + "outputId": "f61e4408-4001-4cc7-ed7d-8472b1c4089f" + }, + "outputs": [], + "source": [ + "from zenml.integrations.tensorflow.visualizers import (\n", + " visualize_tensorboard,\n", + " stop_tensorboard_server,\n", + ")\n", + "\n", + "visualize_tensorboard(\n", + " pipeline_name=\"mnist_pipeline\",\n", + " step_name=\"trainer\",\n", + ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Pre-requisites" + "To stop the Tensorboard server, you can use the `stop_tensorboard` utility function." ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ - "In order to run this example, you need to have installed:\n", - "\n", - "* [Docker](https://docs.docker.com/get-docker/)\n", - "* [K3D](https://k3d.io/v5.2.1/) \n", - "* [Kubectl](https://kubernetes.io/docs/tasks/tools/)" + "stop_tensorboard_server(\n", + " pipeline_name=\"mnist_pipeline\",\n", + " step_name=\"trainer\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gotkJdTQz8j2" + }, + "source": [ + "# Transitioning to Kubeflow Pipelines" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "PMLU4cNW-Ei4" + }, + "source": [ + "We got pretty good results on the digits model that we trained, but at some point we want to get out of this notebook local stack and go to a stack which looks more like production. Here is where the ZenML [Kubeflow Pipelines](https://github.com/kubeflow/pipelines) integration helps!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Define requirements" + "## Pre-requisites" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "%%writefile requirements.txt\n", - "scikit-learn\n", - "pandas\n", - "numpy" + "In order to run this example, you need to have installed:\n", + "\n", + "* [Docker](https://docs.docker.com/get-docker/)\n", + "* [K3D](https://k3d.io/v5.2.1/) \n", + "* [Kubectl](https://kubernetes.io/docs/tasks/tools/)" ] }, { @@ -478,7 +559,7 @@ "metadata": {}, "outputs": [], "source": [ - "%%writefile run.py\n", + "%%writefile run-kubeflow.py\n", "# Copyright (c) ZenML GmbH 2021. All Rights Reserved.\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", @@ -497,24 +578,26 @@ "import os\n", "\n", "import numpy as np\n", - "from sklearn.base import ClassifierMixin\n", + "import tensorflow as tf\n", "\n", - "from zenml.integrations.sklearn.helpers.digits import get_digits, get_digits_model\n", + "from zenml.integrations.constants import TENSORFLOW\n", "from zenml.pipelines import pipeline\n", - "from zenml.steps import step\n", - "from zenml.steps.step_output import Output\n", - "\n", - "# Path to a pip requirements file that contains requirements necessary to run\n", - "# the pipeline\n", - "requirements_file = os.path.join(os.path.dirname(__file__), \"requirements.txt\")\n", + "from zenml.repository import Repository\n", + "from zenml.steps import BaseStepConfig, Output, StepContext, step\n", "\n", "\n", "@step\n", "def importer() -> Output(\n", - " X_train=np.ndarray, X_test=np.ndarray, y_train=np.ndarray, y_test=np.ndarray\n", + " X_train=np.ndarray,\n", + " X_test=np.ndarray,\n", + " y_train=np.ndarray,\n", + " y_test=np.ndarray,\n", "):\n", - " \"\"\"Loads the digits array as normal numpy arrays.\"\"\"\n", - " X_train, X_test, y_train, y_test = get_digits()\n", + " \"\"\"Download the MNIST data store it as an artifact\"\"\"\n", + " (X_train, y_train), (\n", + " X_test,\n", + " y_test,\n", + " ) = tf.keras.datasets.mnist.load_data()\n", " return X_train, X_test, y_train, y_test\n", "\n", "\n", @@ -528,14 +611,48 @@ " return X_train_normed, X_test_normed\n", "\n", "\n", - "@step(enable_cache=False)\n", + "class TrainerConfig(BaseStepConfig):\n", + " \"\"\"Trainer params\"\"\"\n", + "\n", + " epochs: int = 1\n", + " lr: float = 0.001\n", + "\n", + "\n", + "@step\n", "def trainer(\n", " X_train: np.ndarray,\n", " y_train: np.ndarray,\n", - ") -> ClassifierMixin:\n", - " \"\"\"Train a simple sklearn classifier for the digits dataset.\"\"\"\n", - " model = get_digits_model()\n", - " model.fit(X_train, y_train)\n", + " context: StepContext,\n", + " config: TrainerConfig,\n", + ") -> tf.keras.Model:\n", + " \"\"\"Train a neural net from scratch to recognize MNIST digits return our\n", + " model or the learner\"\"\"\n", + " model = tf.keras.Sequential(\n", + " [\n", + " tf.keras.layers.Flatten(input_shape=(28, 28)),\n", + " tf.keras.layers.Dense(10, activation=\"relu\"),\n", + " tf.keras.layers.Dense(10),\n", + " ]\n", + " )\n", + "\n", + " log_dir = os.path.join(context.get_output_artifact_uri(), \"logs\")\n", + " tensorboard_callback = tf.keras.callbacks.TensorBoard(\n", + " log_dir=log_dir, histogram_freq=1\n", + " )\n", + "\n", + " model.compile(\n", + " optimizer=tf.keras.optimizers.Adam(config.lr),\n", + " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", + " metrics=[\"accuracy\"],\n", + " )\n", + "\n", + " model.fit(\n", + " X_train,\n", + " y_train,\n", + " epochs=config.epochs,\n", + " callbacks=[tensorboard_callback],\n", + " )\n", + "\n", " return model\n", "\n", "\n", @@ -543,15 +660,16 @@ "def evaluator(\n", " X_test: np.ndarray,\n", " y_test: np.ndarray,\n", - " model: ClassifierMixin,\n", + " model: tf.keras.Model,\n", ") -> float:\n", " \"\"\"Calculate the accuracy on the test set\"\"\"\n", - " test_acc = model.score(X_test, y_test)\n", + "\n", + " _, test_acc = model.evaluate(X_test, y_test, verbose=2)\n", " logging.info(f\"Test accuracy: {test_acc}\")\n", " return test_acc\n", "\n", "\n", - "@pipeline(requirements_file=requirements_file)\n", + "@pipeline(required_integrations=[TENSORFLOW], enable_cache=False)\n", "def mnist_pipeline(\n", " importer,\n", " normalizer,\n", @@ -583,7 +701,7 @@ "outputs": [], "source": [ "# Initialize a new pipeline\n", - "!python run.py" + "!python run-kubeflow.py" ] }, { @@ -839,7 +957,7 @@ "provenance": [] }, "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -853,7 +971,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.9" + "version": "3.8.10" } }, "nbformat": 4, diff --git a/examples/kubeflow/assets/tensorboard-01.png b/examples/kubeflow/assets/tensorboard-01.png new file mode 100644 index 00000000000..1e1bd1cc1af Binary files /dev/null and b/examples/kubeflow/assets/tensorboard-01.png differ diff --git a/examples/kubeflow/assets/tensorboard-02.png b/examples/kubeflow/assets/tensorboard-02.png new file mode 100644 index 00000000000..c6e393fdcc6 Binary files /dev/null and b/examples/kubeflow/assets/tensorboard-02.png differ diff --git a/examples/kubeflow/assets/tensorboard-03.png b/examples/kubeflow/assets/tensorboard-03.png new file mode 100644 index 00000000000..f5ae02d06e9 Binary files /dev/null and b/examples/kubeflow/assets/tensorboard-03.png differ diff --git a/examples/kubeflow/assets/tensorboard-kubeflow-ui.png b/examples/kubeflow/assets/tensorboard-kubeflow-ui.png new file mode 100644 index 00000000000..41623ca4445 Binary files /dev/null and b/examples/kubeflow/assets/tensorboard-kubeflow-ui.png differ diff --git a/examples/kubeflow/assets/tensorboard-kubeflow-vis.png b/examples/kubeflow/assets/tensorboard-kubeflow-vis.png new file mode 100644 index 00000000000..6a8e6de43ab Binary files /dev/null and b/examples/kubeflow/assets/tensorboard-kubeflow-vis.png differ diff --git a/examples/kubeflow/pipeline.py b/examples/kubeflow/pipeline.py index 18727b207bf..4d33b621bcd 100644 --- a/examples/kubeflow/pipeline.py +++ b/examples/kubeflow/pipeline.py @@ -13,18 +13,14 @@ # permissions and limitations under the License. import logging -from datetime import datetime, timedelta +import os import numpy as np -from sklearn.base import ClassifierMixin +import tensorflow as tf -from zenml.integrations.constants import SKLEARN -from zenml.integrations.sklearn.helpers.digits import ( - get_digits, - get_digits_model, -) -from zenml.pipelines import Schedule, pipeline -from zenml.steps import Output, step +from zenml.integrations.constants import TENSORFLOW +from zenml.pipelines import pipeline +from zenml.steps import BaseStepConfig, Output, StepContext, step @step @@ -34,8 +30,11 @@ def importer() -> Output( y_train=np.ndarray, y_test=np.ndarray, ): - """Loads the digits array as normal numpy arrays.""" - X_train, X_test, y_train, y_test = get_digits() + """Download the MNIST data store it as an artifact""" + (X_train, y_train), ( + X_test, + y_test, + ) = tf.keras.datasets.mnist.load_data() return X_train, X_test, y_train, y_test @@ -49,15 +48,48 @@ def normalizer( return X_train_normed, X_test_normed -@step(enable_cache=False) +class TrainerConfig(BaseStepConfig): + """Trainer params""" + + epochs: int = 5 + lr: float = 0.001 + + +@step(enable_cache=True) def trainer( X_train: np.ndarray, y_train: np.ndarray, -) -> ClassifierMixin: - """Train a simple sklearn classifier for the digits dataset.""" - model = get_digits_model() + context: StepContext, + config: TrainerConfig, +) -> tf.keras.Model: + """Train a neural net from scratch to recognize MNIST digits return our + model or the learner""" + model = tf.keras.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(28, 28)), + tf.keras.layers.Dense(10, activation="relu"), + tf.keras.layers.Dense(10), + ] + ) + + log_dir = os.path.join(context.get_output_artifact_uri(), "logs") + tensorboard_callback = tf.keras.callbacks.TensorBoard( + log_dir=log_dir, histogram_freq=1 + ) + + model.compile( + optimizer=tf.keras.optimizers.Adam(config.lr), + loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), + metrics=["accuracy"], + ) + + model.fit( + X_train, + y_train, + epochs=config.epochs, + callbacks=[tensorboard_callback], + ) - model.fit(X_train, y_train) return model @@ -65,15 +97,16 @@ def trainer( def evaluator( X_test: np.ndarray, y_test: np.ndarray, - model: ClassifierMixin, + model: tf.keras.Model, ) -> float: """Calculate the accuracy on the test set""" - test_acc = model.score(X_test, y_test) + + _, test_acc = model.evaluate(X_test, y_test, verbose=2) logging.info(f"Test accuracy: {test_acc}") return test_acc -@pipeline(required_integrations=[SKLEARN]) +@pipeline(required_integrations=[TENSORFLOW], enable_cache=True) def mnist_pipeline( importer, normalizer, @@ -82,8 +115,6 @@ def mnist_pipeline( ): # Link all the steps together X_train, X_test, y_train, y_test = importer() - X_trained_normed, X_test_normed = normalizer( - X_train=X_train, X_test=X_test - ) + X_trained_normed, X_test_normed = normalizer(X_train=X_train, X_test=X_test) model = trainer(X_train=X_trained_normed, y_train=y_train) evaluator(X_test=X_test_normed, y_test=y_test, model=model) diff --git a/examples/kubeflow/requirements.txt b/examples/kubeflow/requirements.txt deleted file mode 100644 index 9dbbfe49bcb..00000000000 --- a/examples/kubeflow/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -scikit-learn -pandas -numpy diff --git a/examples/kubeflow/run.py b/examples/kubeflow/run.py index 5766ebe692b..9bc472fff87 100644 --- a/examples/kubeflow/run.py +++ b/examples/kubeflow/run.py @@ -12,22 +12,60 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. -from datetime import datetime, timedelta - +import click from pipeline import ( + TrainerConfig, mnist_pipeline, importer, trainer, evaluator, normalizer, ) +from rich import print +from zenml.integrations.tensorflow.services import ( + TensorboardService, + TensorboardServiceConfig, +) +from zenml.integrations.tensorflow.visualizers import ( + visualize_tensorboard, + stop_tensorboard_server, +) +from zenml.repository import Repository + + +@click.command() +@click.option("--epochs", default=5, help="Number of epochs for training") +@click.option("--lr", default=0.001, help="Learning rate for training") +@click.option( + "--stop-tensorboard", + is_flag=True, + default=False, + help="Stop the Tensorboard server", +) +def main(epochs: int, lr: float, stop_tensorboard: bool): + """Run the mnist example pipeline""" + + if stop_tensorboard: + stop_tensorboard_server( + pipeline_name="mnist_pipeline", + step_name="trainer", + ) + return -if __name__ == "__main__": # Run the pipeline p = mnist_pipeline( importer=importer(), normalizer=normalizer(), - trainer=trainer(), + trainer=trainer(config=TrainerConfig(epochs=epochs, lr=lr)), evaluator=evaluator(), ) p.run() + + visualize_tensorboard( + pipeline_name="mnist_pipeline", + step_name="trainer", + ) + + +if __name__ == "__main__": + main() diff --git a/examples/kubeflow/run_schedule.py b/examples/kubeflow/run_schedule.py index f801816bad5..da5767042ad 100644 --- a/examples/kubeflow/run_schedule.py +++ b/examples/kubeflow/run_schedule.py @@ -20,8 +20,8 @@ trainer, evaluator, normalizer, - Schedule, ) +from zenml.pipelines import Schedule if __name__ == "__main__": # Run the pipeline diff --git a/examples/kubeflow/setup.sh b/examples/kubeflow/setup.sh index e08656c7f1e..5e62ff15af6 100644 --- a/examples/kubeflow/setup.sh +++ b/examples/kubeflow/setup.sh @@ -20,9 +20,14 @@ setup_stack () { } pre_run () { - zenml integration install kubeflow sklearn + zenml integration install kubeflow tensorflow } pre_run_forced () { - zenml integration install kubeflow sklearn -f + zenml integration install kubeflow tensorflow -f +} + +post_run () { + # cleanup the last local ZenML daemon started by the example + pkill -n -f zenml.services.local.local_daemon_entrypoint || true } diff --git a/src/zenml/__init__.py b/src/zenml/__init__.py index c5b42b21a4a..4de9b897a84 100644 --- a/src/zenml/__init__.py +++ b/src/zenml/__init__.py @@ -14,10 +14,6 @@ import os -from rich.traceback import install - -install(show_locals=True) - ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(ROOT_DIR, "VERSION")) as version_file: diff --git a/src/zenml/integrations/aws/io/__init__.py b/src/zenml/integrations/aws/io/__init__.py index f781f82f7d0..17fa3d17553 100644 --- a/src/zenml/integrations/aws/io/__init__.py +++ b/src/zenml/integrations/aws/io/__init__.py @@ -16,4 +16,4 @@ from zenml.integrations.aws.io.s3_plugin import ZenS3 -DEFAULT_FILESYSTEM_REGISTRY.register(ZenS3, 15) +DEFAULT_FILESYSTEM_REGISTRY.register(ZenS3, 5) diff --git a/src/zenml/integrations/azure/io/__init__.py b/src/zenml/integrations/azure/io/__init__.py index 31509bfef47..0bba3890df6 100644 --- a/src/zenml/integrations/azure/io/__init__.py +++ b/src/zenml/integrations/azure/io/__init__.py @@ -16,4 +16,4 @@ from zenml.integrations.azure.io.azure_plugin import ZenAzure -DEFAULT_FILESYSTEM_REGISTRY.register(ZenAzure, 15) +DEFAULT_FILESYSTEM_REGISTRY.register(ZenAzure, 5) diff --git a/src/zenml/integrations/gcp/io/__init__.py b/src/zenml/integrations/gcp/io/__init__.py index 63337084706..321aefd3506 100644 --- a/src/zenml/integrations/gcp/io/__init__.py +++ b/src/zenml/integrations/gcp/io/__init__.py @@ -16,4 +16,4 @@ from zenml.integrations.gcp.io.gcs_plugin import ZenGCS -DEFAULT_FILESYSTEM_REGISTRY.register(ZenGCS, 15) +DEFAULT_FILESYSTEM_REGISTRY.register(ZenGCS, 5) diff --git a/src/zenml/integrations/kubeflow/container_entrypoint.py b/src/zenml/integrations/kubeflow/container_entrypoint.py index bee5e4c9189..96eb93e2518 100644 --- a/src/zenml/integrations/kubeflow/container_entrypoint.py +++ b/src/zenml/integrations/kubeflow/container_entrypoint.py @@ -37,7 +37,9 @@ from tfx.types import artifact, channel, standard_artifacts from tfx.types.channel import Property +from zenml.artifact_stores import LocalArtifactStore from zenml.artifacts.base_artifact import BaseArtifact +from zenml.artifacts.model_artifact import ModelArtifact from zenml.artifacts.type_registry import type_registry from zenml.exceptions import RepositoryNotFoundError from zenml.integrations.registry import integration_registry @@ -277,13 +279,21 @@ def _dump_output_populated_artifacts( if ( spec.artifact_spec.type.name == standard_artifacts.ModelRun.TYPE_NAME + or spec.artifact_spec.type.name == ModelArtifact.TYPE_NAME ): output_model = execution_info.output_dict[name][0] - + source = output_model.uri + + # For local artifact repository, use a path that is relative to + # the point where the local artifact folder is mounted as a volume + artifact_store = Repository().active_stack.artifact_store + if isinstance(artifact_store, LocalArtifactStore): + source = os.path.relpath(source, artifact_store.path) + source = f"volume://local-artifact-store/{source}" # Add Tensorboard view. tensorboard_output = { "type": "tensorboard", - "source": output_model.uri, + "source": source, } outputs.append(tensorboard_output) diff --git a/src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py b/src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py index fa6c41e7260..855f0f41091 100644 --- a/src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py +++ b/src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py @@ -21,6 +21,7 @@ from kfp_server_api.exceptions import ApiException import zenml.io.utils +from zenml.artifact_stores import LocalArtifactStore from zenml.enums import OrchestratorFlavor, StackComponentType from zenml.exceptions import ProvisioningError from zenml.integrations.kubeflow.orchestrators import local_deployment_utils @@ -161,6 +162,17 @@ def run_pipeline( runtime_configuration: "RuntimeConfiguration", ) -> Any: """Runs a pipeline on Kubeflow Pipelines.""" + # First check whether its running in a notebok + from zenml.environment import Environment + + if Environment.in_notebook(): + raise RuntimeError( + "The Kubeflow orchestrator cannot run pipelines in a notebook environment. The reason is that it is " + "non-trivial to create a Docker image of a notebook. Please consider refactoring your notebook cells " + "into separate scripts in a Python module and run the code outside of a notebook when using this " + "orchestrator." + ) + from zenml.integrations.kubeflow.docker_utils import get_image_digest image_name = self.get_docker_image_name(pipeline.name) @@ -403,6 +415,13 @@ def provision(self) -> None: kubernetes_context=kubernetes_context ) + artifact_store = Repository().active_stack.artifact_store + if isinstance(artifact_store, LocalArtifactStore): + local_deployment_utils.add_hostpath_to_kubeflow_pipelines( + kubernetes_context=kubernetes_context, + local_path=artifact_store.path, + ) + local_deployment_utils.start_kfp_ui_daemon( pid_file_path=self._pid_file_path, log_file_path=self.log_file, diff --git a/src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py b/src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py index 2304fae7b77..f0f17fb4bbf 100644 --- a/src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py +++ b/src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py @@ -228,6 +228,129 @@ def deploy_kubeflow_pipelines(kubernetes_context: str) -> None: logger.info("Finished Kubeflow Pipelines setup.") +def add_hostpath_to_kubeflow_pipelines( + kubernetes_context: str, local_path: str +) -> None: + """Patches the Kubeflow Pipelines deployment to mount a local folder as + a hostpath for visualization purposes. + + This function reconfigures the Kubeflow pipelines deployment to use a + shared local folder to support loading the Tensorboard viewer and other + pipeline visualization results from a local artifact store, as described + here: + + https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md + + Args: + kubernetes_context: The kubernetes context on which Kubeflow Pipelines + should be patched. + local_path: The path to the local folder to mount as a hostpath. + """ + logger.info("Patching Kubeflow Pipelines to mount a local folder.") + + pod_template = { + "spec": { + "serviceAccountName": "kubeflow-pipelines-viewer", + "containers": [ + { + "volumeMounts": [ + { + "mountPath": local_path, + "name": "local-artifact-store", + } + ] + } + ], + "volumes": [ + { + "hostPath": { + "path": local_path, + "type": "Directory", + }, + "name": "local-artifact-store", + } + ], + } + } + pod_template_json = json.dumps(pod_template, indent=2) + config_map_data = {"data": {"viewer-pod-template.json": pod_template_json}} + config_map_data_json = json.dumps(config_map_data, indent=2) + + logger.debug( + "Adding host path volume for local path `%s` to kubeflow pipeline" + "viewer pod template configuration.", + local_path, + ) + subprocess.check_call( + [ + "kubectl", + "--context", + kubernetes_context, + "-n", + "kubeflow", + "patch", + "configmap/ml-pipeline-ui-configmap", + "--type", + "merge", + "-p", + config_map_data_json, + ] + ) + + deployment_patch = { + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": "ml-pipeline-ui", + "volumeMounts": [ + { + "mountPath": local_path, + "name": "local-artifact-store", + } + ], + } + ], + "volumes": [ + { + "hostPath": { + "path": local_path, + "type": "Directory", + }, + "name": "local-artifact-store", + } + ], + } + } + } + } + deployment_patch_json = json.dumps(deployment_patch, indent=2) + + logger.debug( + "Adding host path volume for local path `%s` to the kubeflow UI", + local_path, + ) + subprocess.check_call( + [ + "kubectl", + "--context", + kubernetes_context, + "-n", + "kubeflow", + "patch", + "deployment/ml-pipeline-ui", + "--type", + "strategic", + "-p", + deployment_patch_json, + ] + ) + wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context) + + logger.info("Finished patching Kubeflow Pipelines setup.") + + def start_kfp_ui_daemon( pid_file_path: str, log_file_path: str, port: int ) -> None: diff --git a/src/zenml/integrations/tensorflow/__init__.py b/src/zenml/integrations/tensorflow/__init__.py index 6250e9ee5ad..d45d3b16cd9 100644 --- a/src/zenml/integrations/tensorflow/__init__.py +++ b/src/zenml/integrations/tensorflow/__init__.py @@ -19,12 +19,17 @@ class TensorflowIntegration(Integration): """Definition of Tensorflow integration for ZenML.""" NAME = TENSORFLOW - REQUIREMENTS = ["tensorflow==2.8.0"] + REQUIREMENTS = ["tensorflow==2.8.0", "tensorflow_io==0.24.0"] @classmethod def activate(cls) -> None: """Activates the integration.""" + # need to import this explicitly to load the Tensoflow file IO support + # for S3 and other file systems + import tensorflow_io # type: ignore [import] + from zenml.integrations.tensorflow import materializers # noqa + from zenml.integrations.tensorflow import services # noqa TensorflowIntegration.check_installation() diff --git a/src/zenml/integrations/tensorflow/services/__init__.py b/src/zenml/integrations/tensorflow/services/__init__.py new file mode 100644 index 00000000000..8ba2dcf8ff5 --- /dev/null +++ b/src/zenml/integrations/tensorflow/services/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +from zenml.integrations.tensorflow.services.tensorboard_service import ( # noqa + TensorboardService, + TensorboardServiceConfig, +) diff --git a/src/zenml/integrations/tensorflow/services/tensorboard_service.py b/src/zenml/integrations/tensorflow/services/tensorboard_service.py new file mode 100644 index 00000000000..14b3632717f --- /dev/null +++ b/src/zenml/integrations/tensorflow/services/tensorboard_service.py @@ -0,0 +1,122 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +from typing import Any, Dict, Union + +from tensorboard import default, program # type: ignore [import] +from tensorboard.uploader import uploader_subcommand # type: ignore [import] + +from zenml.logger import get_logger +from zenml.services import ( + HTTPEndpointHealthMonitor, + HTTPEndpointHealthMonitorConfig, + LocalDaemonService, + LocalDaemonServiceConfig, + LocalDaemonServiceEndpoint, + LocalDaemonServiceEndpointConfig, + ServiceEndpointProtocol, + ServiceType, +) + +logger = get_logger(__name__) + + +class TensorboardServiceConfig(LocalDaemonServiceConfig): + """Tensorboard service configuration. + + Attributes: + logdir: location of Tensorboard log files. + max_reload_threads: the max number of threads that TensorBoard can use + to reload runs. Each thread reloads one run at a time. + reload_interval: how often the backend should load more data, in + seconds. Set to 0 to load just once at startup. + """ + + logdir: str + max_reload_threads: int = 1 + reload_interval: int = 5 + + +class TensorboardService(LocalDaemonService): + """Tensorboard service that can be used to start a local Tensorboard server + for one or more models. + + Attributes: + SERVICE_TYPE: a service type descriptor with information describing + the Tensorboard service class + config: service configuration + endpoint: optional service endpoint + """ + + SERVICE_TYPE = ServiceType( + name="tensorboard", + type="visualization", + flavor="tensorboard", + description="Tensorboard visualization service", + ) + + config: TensorboardServiceConfig + endpoint: LocalDaemonServiceEndpoint + + def __init__( + self, + config: Union[TensorboardServiceConfig, Dict[str, Any]], + **attrs: Any, + ) -> None: + # ensure that the endpoint is created before the service is initialized + # TODO [HIGH]: implement a service factory or builder for Tensorboard + # deployment services + if ( + isinstance(config, TensorboardServiceConfig) + and "endpoint" not in attrs + ): + endpoint = LocalDaemonServiceEndpoint( + config=LocalDaemonServiceEndpointConfig( + protocol=ServiceEndpointProtocol.HTTP, + ), + monitor=HTTPEndpointHealthMonitor( + config=HTTPEndpointHealthMonitorConfig( + healthcheck_uri_path="", + use_head_request=True, + ) + ), + ) + attrs["endpoint"] = endpoint + super().__init__(config=config, **attrs) + + def run(self) -> None: + logger.info( + "Starting Tensorboard service as blocking " + "process... press CTRL+C once to stop it." + ) + + self.endpoint.prepare_for_start() + + try: + tensorboard = program.TensorBoard( + plugins=default.get_plugins(), + subcommands=[uploader_subcommand.UploaderSubcommand()], + ) + tensorboard.configure( + logdir=self.config.logdir, + port=self.endpoint.status.port, + host="localhost", + max_reload_threads=self.config.max_reload_threads, + reload_interval=self.config.reload_interval, + ) + tensorboard.main() + except KeyboardInterrupt: + logger.info( + "Tensorboard service stopped. Resuming normal execution." + ) diff --git a/src/zenml/integrations/tensorflow/visualizers/__init__.py b/src/zenml/integrations/tensorflow/visualizers/__init__.py new file mode 100644 index 00000000000..8f844d9b577 --- /dev/null +++ b/src/zenml/integrations/tensorflow/visualizers/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +from zenml.integrations.tensorflow.visualizers.tensorboard_visualizer import ( # noqa + TensorboardVisualizer, + stop_tensorboard_server, + visualize_tensorboard, +) diff --git a/src/zenml/integrations/tensorflow/visualizers/tensorboard_visualizer.py b/src/zenml/integrations/tensorflow/visualizers/tensorboard_visualizer.py new file mode 100644 index 00000000000..aaae42cfbe4 --- /dev/null +++ b/src/zenml/integrations/tensorflow/visualizers/tensorboard_visualizer.py @@ -0,0 +1,219 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +import os +import sys +from typing import Any, Optional, cast + +import psutil +from rich import print +from tensorboard import notebook # type: ignore [import] +from tensorboard.manager import ( # type: ignore [import] + TensorBoardInfo, + get_all, +) + +from zenml.artifacts.model_artifact import ModelArtifact +from zenml.environment import Environment +from zenml.integrations.tensorflow.services.tensorboard_service import ( + TensorboardService, + TensorboardServiceConfig, +) +from zenml.logger import get_logger +from zenml.post_execution import StepView +from zenml.repository import Repository +from zenml.visualizers import BaseStepVisualizer + +logger = get_logger(__name__) + + +class TensorboardVisualizer(BaseStepVisualizer): + """The implementation of a Whylogs Visualizer.""" + + @classmethod + def find_running_tensorboard_server( + cls, logdir: str + ) -> Optional[TensorBoardInfo]: + """Find a local Tensorboard server instance running for the supplied + logdir location and return its TCP port. + + Returns: + The TensorBoardInfo describing the running Tensorboard server or + None if no server is running for the supplied logdir location. + """ + for server in get_all(): + if ( + server.logdir == logdir + and server.pid + and psutil.pid_exists(server.pid) + ): + return server + return None + + def visualize( + self, + object: StepView, + *args: Any, + height: int = 800, + **kwargs: Any, + ) -> None: + """Start a Tensorboard server to visualize all models logged as + artifacts by the indicated step. The server will monitor and display + all the models logged by past and future step runs. + + Args: + object: StepView fetched from run.get_step(). + height: Height of the generated visualization. + """ + for _, artifact_view in object.outputs.items(): + # filter out anything but model artifacts + if artifact_view.type == ModelArtifact.TYPE_NAME: + logdir = os.path.dirname(artifact_view.uri) + + # first check if a Tensorboard server is already running for + # the same logdir location and use that one + running_server = self.find_running_tensorboard_server(logdir) + if running_server: + self.visualize_tensorboard(running_server.port, height) + return + + if sys.platform == "win32": + # Daemon service functionality is currently not supported on Windows + print( + "You can run:\n" + f"[italic green] tensorboard --logdir {logdir}" + "[/italic green]\n" + "...to visualize the Tensorboard logs for your trained model." + ) + else: + # start a new Tensorboard server + service = TensorboardService( + TensorboardServiceConfig( + logdir=logdir, + ) + ) + service.start(timeout=20) + if service.endpoint.status.port: + self.visualize_tensorboard( + service.endpoint.status.port, height + ) + return + + def visualize_tensorboard( + self, + port: int, + height: int, + ) -> None: + """Generate a visualization of a Tensorboard. + + Args: + port: the TCP port where the Tensorboard server is listening for + requests. + height: Height of the generated visualization. + logdir: The logdir location for the Tensorboard server. + """ + if Environment.in_notebook(): + + notebook.display(port, height=height) + return + + print( + "You can visit:\n" + f"[italic green] http://localhost:{port}/[/italic green]\n" + "...to visualize the Tensorboard logs for your trained model." + ) + + def stop( + self, + object: StepView, + ) -> None: + """Stop the Tensorboard server previously started for a pipeline step. + + Args: + object: StepView fetched from run.get_step(). + """ + for _, artifact_view in object.outputs.items(): + # filter out anything but model artifacts + if artifact_view.type == ModelArtifact.TYPE_NAME: + logdir = os.path.dirname(artifact_view.uri) + + # first check if a Tensorboard server is already running for + # the same logdir location and use that one + running_server = self.find_running_tensorboard_server(logdir) + if not running_server: + return + + logger.debug( + "Stopping tensorboard server with PID '%d' ...", + running_server.pid, + ) + try: + p = psutil.Process(running_server.pid) + except psutil.Error: + logger.error( + "Could not find process for PID '%d' ...", + running_server.pid, + ) + continue + p.kill() + return + + +def get_step(pipeline_name: str, step_name: str) -> StepView: + """Get the StepView for the specified pipeline and step name. + + Args: + pipeline_name: The name of the pipeline. + step_name: The name of the step. + + Returns: + The StepView for the specified pipeline and step name. + """ + repo = Repository() + pipeline = repo.get_pipeline(pipeline_name) + if pipeline is None: + raise RuntimeError(f"No pipeline with name `{pipeline_name}` was found") + + last_run = pipeline.runs[-1] + step = last_run.get_step(name=step_name) + if step is None: + raise RuntimeError( + f"No pipeline step with name `{step_name}` was found in " + f"pipeline `{pipeline_name}`" + ) + return cast(StepView, step) + + +def visualize_tensorboard(pipeline_name: str, step_name: str) -> None: + """Start a Tensorboard server to visualize all models logged as output by + the named pipeline step. The server will monitor and display all the models + logged by past and future step runs. + + Args: + pipeline_name: the name of the pipeline + step_name: pipeline step name + """ + step = get_step(pipeline_name, step_name) + TensorboardVisualizer().visualize(step) + + +def stop_tensorboard_server(pipeline_name: str, step_name: str) -> None: + """Stop the Tensorboard server previously started for a pipeline step. + + Args: + pipeline_name: the name of the pipeline + step_name: pipeline step name + """ + step = get_step(pipeline_name, step_name) + TensorboardVisualizer().stop(step) diff --git a/src/zenml/logger.py b/src/zenml/logger.py index 6f8d7233de0..a4718a78ca6 100644 --- a/src/zenml/logger.py +++ b/src/zenml/logger.py @@ -20,6 +20,7 @@ from typing import Any, Dict from absl import logging as absl_logging +from rich.traceback import install as rich_tb_install from zenml.constants import ( ABSL_LOGGING_VERBOSITY, @@ -103,6 +104,8 @@ def set_root_verbosity() -> None: """Set the root verbosity.""" level = get_logging_level() if level != LoggingLevels.NOTSET: + rich_tb_install(show_locals=(level == LoggingLevels.DEBUG)) + logging.basicConfig(level=level.value) get_logger(__name__).debug( f"Logging set to level: " f"{logging.getLevelName(level.value)}" diff --git a/src/zenml/metadata_stores/base_metadata_store.py b/src/zenml/metadata_stores/base_metadata_store.py index d61baad268e..7cc9b2d43a1 100644 --- a/src/zenml/metadata_stores/base_metadata_store.py +++ b/src/zenml/metadata_stores/base_metadata_store.py @@ -63,8 +63,12 @@ def flavor(self) -> MetadataStoreFlavor: def store(self) -> metadata_store.MetadataStore: """General property that hooks into TFX metadata store.""" # TODO [ENG-133]: this always gets recreated, is this intended? + config = self.get_tfx_metadata_config() return metadata_store.MetadataStore( - self.get_tfx_metadata_config(), enable_upgrade_migration=True + config, + enable_upgrade_migration=isinstance( + config, metadata_store_pb2.ConnectionConfig + ), ) @abstractmethod