Skip to content

Commit

Permalink
Initial workflow implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
martynas-subonis committed Sep 7, 2024
1 parent afb278d commit aae0841
Show file tree
Hide file tree
Showing 27 changed files with 5,192 additions and 413 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/build_docker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Build Docker Images

on:
pull_request:
branches:
- main

jobs:
build-docker:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ${{ matrix.directory }}
strategy:
matrix:
directory: [
data_prep,
train,
eval
]

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Extract component name
id: extract-component-name
run: echo "component_name=$(basename ${{ matrix.directory }})" >> $GITHUB_OUTPUT

- name: Build Docker
uses: docker/build-push-action@v6
with:
context: ${{ matrix.directory }}
tags: ${{ steps.extract-component-name.outputs.component_name }}:latest
36 changes: 36 additions & 0 deletions .github/workflows/code_check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Code Quality Check

on:
pull_request:
branches:
- main

jobs:
code-check:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.3

- name: Install dependencies using Poetry
run: poetry install

- name: Run ruff check
run: poetry run ruff check

- name: Run ruff format check
run: poetry run ruff format --check

- name: Run mypy
run: |
poetry run mypy .
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# project specific
pipeline.py
pipeline.yaml
torch_model
onnx_model.onnx
loss_plot.png
local.py
train_images
val_images
test_images
train_split.json
val_split.json
test_split.json
confusion_matrix.png
f1_score_by_class.png
precision_by_class.png
recall_by_class.png

# pyenv
.python-version

Expand Down
143 changes: 143 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# ML Workflows

This repository showcases an implementation of model training
[pipeline template](https://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-template). Although the project is
using [Kubeflow Pipelines (KFP)](https://www.kubeflow.org/docs/components/pipelines/), [Google Cloud Platform (GCP)](https://cloud.google.com/docs)
and [weather image classification dataset](https://www.kaggle.com/datasets/jehanbhathena/weather-dataset) for this case
study, the focus is on demonstrating key concepts like reproducibility, artifact tracking, and automation, rather than
the specifics of tools and implementation details.

Disclaimer — this project doesn't advocate for Kubeflow Pipelines as the definitive framework for developing machine
learning workflows (alternative frameworks will be provided in the appendix), nor does it endorse Vertex AI as the
optimal managed platform. The choice of these specific tools stems from pragmatic reasons — namely, their immediate
availability in my existing setup.

The `ml-workflows` follows a `standard` project structure, as well as tooling, proposed in the
repository [py-manage](https://github.com/martynas-subonis/py-manage).

## Table of Contents

- [Hypothetical Problem Statement](#hypothetical-problem-statement)
- [Pipeline Overview](#pipeline-overview)
- [Prerequisites](#prerequisites)
- [Compile and Upload Pipeline Template](#compile-and-upload-pipeline-template)
- [Creating Pipeline Run From Your Template](#creating-pipeline-run-from-your-template)
- [Porting Pipeline Template to Different Platforms](#porting-pipeline-template-to-different-platforms)
- [Appendix](#appendix)

## Hypothetical Problem Statement

Let’s assume we are running a renewable energy company that seeks to optimize solar and wind farm operations across
diverse geographic locations. By implementing an AI system that can automatically recognize weather conditions from
images captured by on-site cameras, we can predict energy output more accurately and adjust operations in real-time.
This weather recognition capability would enable more efficient resource allocation and improve overall energy
production forecasting.

For this problem, we've acquired a
[“Weather Image Recognition”](https://www.kaggle.com/datasets/jehanbhathena/weather-dataset) dataset that we believe
will meet our needs. Our goal is to create a model capable of predicting 11 distinct weather conditions: dew, fog/smog,
frost, glaze, hail, lightning, rain, rainbow, rime, sandstorm, and snow. This diverse range of weather phenomena will
allow our AI system to provide comprehensive insights for optimizing our renewable energy operations.

The aim of our project is to develop a robust model training pipeline template that researchers and engineers can easily
reuse with different parameters. This template should accommodate varying data sources, data splits, random
seeds, and training epochs. The pipeline should guarantee reproducibility and ease of artifact tracking, as well
as high level of automation.

## Pipeline Overview

![Pipeline Overview](readme_assets/overview.png)

The [pipeline](template.py) consists of three main components:

1. Data Preparation ([data_prep](data_prep)):

-
- Splits the dataset into train, validation, and test sets.
-
- Uses stratified sampling to maintain class distribution.
-
- Outputs: train, validation, and test split information.

2. Model Training ([train](train)):
-
- Implements reproducibility measures.
-
- Uses MobileNetV3-Small architecture with transfer learning.
-
- Fine-tunes the classifier head for the specific problem domain.
-
- Outputs: trained PyTorch model, ONNX model, training metrics, and loss plot.

3. Model Evaluation ([eval](eval)):

-
- Evaluates the model on the test set.
-
- Calculates evaluation metrics.
-
- Outputs: confusion matrix, weighted precision, recall, and F1-score.

## Prerequisites

The implementation of this pipeline template/pipeline run is coupled with GCP. In order to be able to compile/upload the
pipeline template, as well as create a pipeline run from the template, one has to:

- Have a [GCP project](https://cloud.google.com/docs).
- Enable [Vertex AI](https://cloud.google.com/vertex-ai) API.
- Have a [GCS](https://cloud.google.com/storage) bucket,
with ["Weather Image Recognition"](https://www.kaggle.com/datasets/jehanbhathena/weather-dataset) dataset uploaded.
- Have a GCS staging bucket, where Kubeflow Pipelines can persist its artifacts.
- Have a Docker type repository in [Artifact Registry](https://cloud.google.com/artifact-registry/docs), where built
images of components are pushed.
- Have a Kubeflow Pipelines type repository in Artifact Registry, where templates of pipelines are pushed.
- Have [gcloud cli](https://cloud.google.com/sdk/docs/install) installed.
- Correctly configure/authorize `gcloud`:

```bash
gcloud config set project $GCP_PROJECT
gcloud auth application-default login
```

- Have a local `.env` file with the following `env` variables set:

```text
KFP_REPOSITORY= # Your Kubeflow Pipelines type repository in Artifact Registry
DATA_BUCKET= # Your GCS bucket with "Weather Image Recognition" dataset
STAGING_BUCKET= # Your GCS bucket, where Kubeflow Pipelines will persist its artifacts.
PREP_DATA_DOCKER_URI= # Your URI of data_prep component Docker image.
TRAIN_MODEL_DOCKER_URI= # Your URI of train component Docker image.
EVAL_MODEL_DOCKER_URI= # Your URI of eval component Docker image.
```

- Have [Poetry installed](https://python-poetry.org/docs/#installation).
- Have Python `~3.12` active in the project directory (using [pyenv](https://github.com/pyenv/pyenv) is advised).

## Compile and Upload Pipeline Template

With the [prerequisites](#prerequisites) met, compiling and uploading the pipeline template should be as simple as:

```bash
poetry install
poetry run template
```

## Creating Pipeline Run From Your Template

To create a pipeline run from the uploaded pipeline template, just follow the
[official Vertex AI documentation](https://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-template#create-pipeline-run-from-template).

## Porting Pipeline Template to Different Platforms

If the new platform offers a [KFP-conformant backend](https://www.kubeflow.org/docs/components/pipelines/overview/),
porting this pipeline template to another platform should be straightforward. The main modification involves adjusting
the data fetching logic in the `data_prep`, `train`, and `eval` components, where the `google.cloud.storage.Client` is
currently used.

## Appendix

Other frameworks for building machine learning workflows besides Kubeflow Pipelines:

- [Flyte](https://github.com/flyteorg/flyte)
- [Metaflow](https://github.com/Netflix/metaflow)
- [Prefect](https://github.com/PrefectHQ/prefect)
18 changes: 18 additions & 0 deletions data_prep/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.12.5-slim AS builder

RUN pip install --upgrade pip==24.2.0 && \
pip install poetry==1.8.3

WORKDIR /app

COPY pyproject.toml poetry.toml poetry.lock ./

RUN poetry install

FROM python:3.12.5-slim AS runtime

WORKDIR /app

ENV PATH="/app/.venv/bin:$PATH"

COPY --from=builder /app/.venv .venv
File renamed without changes.
61 changes: 61 additions & 0 deletions data_prep/component_func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from kfp.dsl import Dataset, Metrics, Output


def prep_data(
data_bucket: str,
random_seed: int,
train_ratio: float,
test_ratio: float,
val_ratio: float,
train_split_info: Output[Dataset],
val_split_info: Output[Dataset],
test_split_info: Output[Dataset],
metrics: Output[Metrics],
) -> None:
import logging
import time
from collections import Counter
from json import dump

from google.cloud.storage import Client
from sklearn.model_selection import train_test_split

start_time = time.time()
logging.info("Started data preparation task.")

for val in (train_ratio, test_ratio, val_ratio):
if val <= 0 or val >= 1:
raise ValueError("Train, test and validation ratios must be in range (0, 1).")

if train_ratio + test_ratio + val_ratio != 1.0:
raise ValueError("Train, test and validation ratios must sum-up to 1.")

client = Client()
bucket = client.bucket(data_bucket)
img_paths: list[str] = [blob.name for blob in client.list_blobs(bucket)]

def derive_class(img_path: str) -> str:
return img_path.split("/")[0]

classes = [derive_class(path) for path in img_paths]
x_train, x_val_test = train_test_split(img_paths, stratify=classes, test_size=1 - train_ratio, random_state=random_seed)

val_test_classes = [derive_class(path) for path in x_val_test]
x_val, x_test = train_test_split(
x_val_test, stratify=val_test_classes, test_size=test_ratio / (test_ratio + val_ratio), random_state=random_seed
)

for name, data_set, artifact in (
("train", x_train, train_split_info),
("validation", x_val, val_split_info),
("test", x_test, test_split_info),
):
metrics.log_metric(f"{name}Size", len(data_set))
counts = Counter(map(derive_class, data_set))
for key, value in counts.items():
metrics.log_metric(f"{name}{key.capitalize()}", value)
with open(artifact.path, "w") as f:
dump(data_set, f)

metrics.log_metric("timeTakenSeconds", round(time.time() - start_time, 2))
logging.info("Successfully finished data preparation task.")
Loading

0 comments on commit aae0841

Please sign in to comment.