From 797bd4abbc7f32d997848efab8180c608fcc5408 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Thu, 15 Jun 2023 12:08:49 -0700 Subject: [PATCH] [Templates] Unify the batch inference template with an existing Data example (#36401) Signed-off-by: Justin Yu --- doc/source/data/examples.rst | 4 +- .../pytorch_resnet_batch_prediction.ipynb | 131 ++++-- .../templates/01_batch_inference/README.md | 6 +- .../templates/01_batch_inference/start.ipynb | 444 ++++++++++-------- 4 files changed, 354 insertions(+), 231 deletions(-) diff --git a/doc/source/data/examples.rst b/doc/source/data/examples.rst index 0915f85ae50b..0a3e326acbc5 100644 --- a/doc/source/data/examples.rst +++ b/doc/source/data/examples.rst @@ -30,9 +30,9 @@ Computer Vision .. button-ref:: examples/pytorch_resnet_batch_prediction - Image Classification Batch Inference with PyTorch ResNet18 + Image Classification Batch Inference with PyTorch ResNet152 + - .. grid-item-card:: .. button-ref:: examples/batch_inference_object_detection diff --git a/doc/source/data/examples/pytorch_resnet_batch_prediction.ipynb b/doc/source/data/examples/pytorch_resnet_batch_prediction.ipynb index 25c4a0e9e803..87ccd88ff49c 100644 --- a/doc/source/data/examples/pytorch_resnet_batch_prediction.ipynb +++ b/doc/source/data/examples/pytorch_resnet_batch_prediction.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "metadata": { "id": "MNwt9bSG0hin" @@ -20,6 +21,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -38,6 +40,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -45,6 +48,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -102,15 +106,15 @@ ], "source": [ "import ray\n", - "from ray.data.datasource.partitioning import Partitioning\n", "\n", - "s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/val/\"\n", + "s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/train/\"\n", "\n", "ds = ray.data.read_images(s3_uri, mode=\"RGB\")\n", "ds\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -138,10 +142,11 @@ } ], "source": [ - "ds.schema()" + "ds.schema()\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -149,6 +154,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -165,10 +171,11 @@ }, "outputs": [], "source": [ - "single_batch = ds.take_batch(10)" + "single_batch = ds.take_batch(10)\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -198,10 +205,11 @@ "from PIL import Image\n", "\n", "img = Image.fromarray(single_batch[\"image\"][0])\n", - "img" + "img\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -216,22 +224,23 @@ }, "outputs": [], "source": [ - "from torchvision.models import ResNet18_Weights\n", + "from torchvision.models import ResNet152_Weights\n", "from torchvision import transforms\n", "from torchvision import models\n", "\n", - "weights = ResNet18_Weights.IMAGENET1K_V1\n", + "weights = ResNet152_Weights.IMAGENET1K_V1\n", "\n", "# Load the pretrained resnet model and move to GPU.\n", "# Remove the `.cuda()` if using CPU.\n", - "model = models.resnet18(weights=weights).cuda()\n", + "model = models.resnet152(weights=weights).cuda()\n", "model.eval()\n", "\n", "imagenet_transforms = weights.transforms\n", - "transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])" + "transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -276,11 +285,14 @@ " prediction_results = model(torch.stack(transformed_batch).cuda())\n", " classes = prediction_results.argmax(dim=1).cpu()\n", "\n", + "del model # Free up GPU memory\n", + "\n", "labels = [weights.meta[\"categories\"][i] for i in classes]\n", - "labels" + "labels\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -288,6 +300,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -295,6 +308,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -302,6 +316,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -318,38 +333,37 @@ "source": [ "from typing import Any, Dict\n", "\n", + "\n", "def preprocess_image(row: Dict[str, Any]):\n", - " return {\"original_image\": row[\"image\"], \n", - " \"transformed_image\": transform(row[\"image\"]),}" + " return {\n", + " \"original_image\": row[\"image\"],\n", + " \"transformed_image\": transform(row[\"image\"]),\n", + " }\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "Then we use the {meth}`map ` API to apply the function to the whole dataset. By using Ray Data's map, we can scale out the preprocessing to all the resources in our Ray cluster Note, the `map` method is lazy, it won't perform execution until we start to consume the results." + "Then we use the {meth}`map ` API to apply the function to the whole dataset. By using Ray Data's map, we can scale out the preprocessing to all the resources in our Ray cluster.\n", + "\n", + "Note: the `map` method is lazy, it won't perform execution until we consume the results." ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "[2023-05-23 18:50:07] [Ray Data] WARNING ray.data.dataset::The `map`, `flat_map`, and `filter` operations are unvectorized and can be very slow. If you're using a vectorized transformation, consider using `.map_batches()` instead.\n" - ] - } - ], + "outputs": [], "source": [ - "transformed_ds = ds.map(preprocess_image)" + "transformed_ds = ds.map(preprocess_image)\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -357,12 +371,13 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's convert the model inference part. Compared with preprocessing, model inference has 2 differences:\n", "1. Model loading and initialization is usually expensive. \n", - "1. Model inference can be optimized with hardware acceleration if we process data in batches. Using larger batches improves GPU utilization and the overall runtime of the inference job.\n", + "2. Model inference can be optimized with hardware acceleration if we process data in batches. Using larger batches improves GPU utilization and the overall runtime of the inference job.\n", "\n", "Thus, we convert the model inference code to the following `ResnetModel` class. In this class, we put the expensive model loading and initialization code in the `__init__` constructor, which will run only once. And we put the model inference code in the `__call__` method, which will be called for each batch.\n", "\n", @@ -381,10 +396,11 @@ "import numpy as np\n", "import torch\n", "\n", + "\n", "class ResnetModel:\n", " def __init__(self):\n", - " self.weights = ResNet18_Weights.IMAGENET1K_V1\n", - " self.model = models.resnet18(self.weights).cuda()\n", + " self.weights = ResNet152_Weights.IMAGENET1K_V1\n", + " self.model = models.resnet152(weights=self.weights).cuda()\n", " self.model.eval()\n", "\n", " def __call__(self, batch: Dict[str, np.ndarray]):\n", @@ -396,8 +412,13 @@ " with torch.inference_mode():\n", " prediction = self.model(torch_batch)\n", " predicted_classes = prediction.argmax(dim=1).detach().cpu()\n", - " predicted_labels = [self.weights.meta[\"categories\"][i] for i in predicted_classes]\n", - " return {\"predicted_label\": predicted_labels, \"original_image\": batch[\"original_image\"]}" + " predicted_labels = [\n", + " self.weights.meta[\"categories\"][i] for i in predicted_classes\n", + " ]\n", + " return {\n", + " \"predicted_label\": predicted_labels,\n", + " \"original_image\": batch[\"original_image\"],\n", + " }\n" ] }, { @@ -405,11 +426,13 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Then we use the {meth}`map_batches ` API to apply the model to the whole dataset. \n", + "Then we use the {meth}`map ` API to apply the model to the whole dataset.\n", + "\n", + "The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](actor-guide). For class-based UDFs, we use the `compute` argument to specify {class}`ActorPoolStrategy ` with the number of parallel actors.\n", "\n", - "The first parameter of {meth}`map_batches ` is the user-defined function (UDF), which can either be a function or a class. In this case, we use a class-based UDF which is run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). For class-based UDFs, we use the `compute` argument to specify {class}`ActorPoolStrategy ` with the number of parallel actors. \n", + "The `batch_size` argument indicates the number of images in each batch. See the Ray dashboard\n", + "for GPU memory usage to experiment with the `batch_size` when using your own model and dataset.\n", "\n", - "The `batch_size` argument indicates the number of images in each batch.\n", "The `num_gpus` argument specifies the number of GPUs needed for each `ResnetModel` instance. In this case, we want 1 GPU for each model replica. If you are doing CPU inference, you can remove the `num_gpus=1`." ] }, @@ -423,13 +446,16 @@ "source": [ "predictions = transformed_ds.map_batches(\n", " ResnetModel,\n", - " compute=ray.data.ActorPoolStrategy(size=4), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n", + " compute=ray.data.ActorPoolStrategy(\n", + " size=4\n", + " ), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n", " num_gpus=1, # Specify 1 GPU per model replica.\n", - " batch_size=1024 # Use the largest batch size that can fit on our GPUs\n", - ")" + " batch_size=720, # Use the largest batch size that can fit on our GPUs\n", + ")\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -437,10 +463,11 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "Let's take a small batch and verify the results." + "Let's take a small batch of predictions and verify the results." ] }, { @@ -451,10 +478,11 @@ }, "outputs": [], "source": [ - "prediction_batch = predictions.take_batch(5)" + "prediction_batch = predictions.take_batch(5)\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -557,23 +585,40 @@ "source": [ "from PIL import Image\n", "\n", - "for image, prediction in zip(prediction_batch[\"original_image\"], prediction_batch[\"predicted_label\"]):\n", + "for image, prediction in zip(\n", + " prediction_batch[\"original_image\"], prediction_batch[\"predicted_label\"]\n", + "):\n", " img = Image.fromarray(image)\n", " display(img)\n", - " print(\"Label: \", prediction)" + " print(\"Label: \", prediction)\n" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ - "If the samples look good, we can proceed with saving the results to an external storage, e.g., S3 or local disks. See [Ray Data Input/Output](https://docs.ray.io/en/latest/data/api/input_output.html) for all supported stoarges and file formats.\n", + "If the samples look good, we can proceed with saving the results to an external storage, e.g., S3 or local disks. See [the guide on saving data](https://docs.ray.io/en/latest/data/saving-data.html) for all supported storage and file formats.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "temp_dir = tempfile.mkdtemp()\n", + "\n", + "# Don't save the images as part of the predictions.\n", + "predictions = predictions.drop_columns([\"original_image\"])\n", "\n", - "```python\n", - "ds.write_parquet(\"local://tmp/inference_results\")\n", - "```" + "# The `local://` prefix is need to make sure all results get written on the head node.\n", + "predictions.write_parquet(f\"local://{temp_dir}\")\n", + "print(f\"Predictions saved to `{temp_dir}`!\")\n" ] } ], diff --git a/doc/source/templates/01_batch_inference/README.md b/doc/source/templates/01_batch_inference/README.md index 9ddfc7bf782f..636ea5c3ba8a 100644 --- a/doc/source/templates/01_batch_inference/README.md +++ b/doc/source/templates/01_batch_inference/README.md @@ -2,8 +2,8 @@ | Template Specification | Description | | ---------------------- | ----------- | -| Summary | This template walks through GPU batch inference on an image dataset using a PyTorch ResNet model. | -| Time to Run | Less than 2 minutes to compute predictions on the dataset. | +| Summary | This template walks through GPU batch inference on an image dataset. | +| Time to Run | Less than 5 minutes to compute predictions on the dataset. | | Minimum Compute Requirements | No hard requirements. The default is 4 nodes, each with 1 NVIDIA T4 GPU. | | Cluster Environment | This template uses the latest Anyscale-provided Ray ML image using Python 3.9: [`anyscale/ray-ml:latest-py39-gpu`](https://docs.anyscale.com/reference/base-images/overview). If you want to change to a different cluster environment, make sure that it is based off of this image! | @@ -11,4 +11,4 @@ **When the workspace is up and running, start coding by clicking on the Jupyter or VSCode icon above. Open the `start.ipynb` file and follow the instructions there.** -By the end, we will have classified around 4000 images using the pre-trained ResNet model and saved these predictions to a local directory. +By the end, we will have classified around 10k images with a PyTorch model. diff --git a/doc/source/templates/01_batch_inference/start.ipynb b/doc/source/templates/01_batch_inference/start.ipynb index e21bea50f3b9..08c8d8758d39 100644 --- a/doc/source/templates/01_batch_inference/start.ipynb +++ b/doc/source/templates/01_batch_inference/start.ipynb @@ -3,400 +3,478 @@ { "attachments": {}, "cell_type": "markdown", - "id": "6fbc3e3c", "metadata": {}, "source": [ "# Scaling Batch Inference with Ray Data\n", "\n", "| Template Specification | Description |\n", "| ---------------------- | ----------- |\n", - "| Summary | This template walks through GPU batch inference on an image dataset using a PyTorch ResNet model. |\n", - "| Time to Run | Less than 2 minutes to compute predictions on the dataset. |\n", + "| Summary | This template walks through GPU batch inference on an image dataset. |\n", + "| Time to Run | Less than 5 minutes to compute predictions on the dataset. |\n", "| Minimum Compute Requirements | No hard requirements. The default is 4 nodes, each with 1 NVIDIA T4 GPU. |\n", - "| Cluster Environment | This template uses the latest Anyscale-provided Ray ML image using Python 3.9: [`anyscale/ray-ml:latest-py39-gpu`](https://docs.anyscale.com/reference/base-images/overview). If you want to change to a different cluster environment, make sure that it is based off of this image! |\n", - "\n", - "By the end, we will have classified > 3000 images using the pre-trained ResNet model and saved these predictions to a local directory.\n", + "| Cluster Environment | This template uses the latest Anyscale-provided Ray ML image using Python 3.9: [`anyscale/ray-ml:latest-py39-gpu`](https://docs.anyscale.com/reference/base-images/overview). If you want to change to a different cluster environment, make sure that it is based off of this image! |\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this example, we will introduce how to use the [Ray Data](https://docs.ray.io/en/latest/data/data.html) for **large-scale image classification batch inference with multiple GPU workers.**\n", "\n", - "> Slot in your code below wherever you see the ✂️ icon to build off of this template!\n", - ">\n", - "> The framework and data format used in this template can be easily replaced to suit your own application!\n", + "In particular, we will:\n", + "- Load Imagenette dataset from S3 bucket and create a [Ray `Dataset`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html).\n", + "- Load a pretrained ResNet model.\n", + "- Use [Ray Data](https://docs.ray.io/en/latest/data/data.html) to preprocess the dataset and do model inference parallelizing across multiple GPUs\n", + "- Evaluate the predictions and save results to S3/local disk.\n", "\n", - "We'll start with some imports:" + "This example will still work even if you do not have GPUs available, but overall performance will be slower." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The default cluster environment used by this template already has all the dependencies\n", + "needed to run. If you're using a custom cluster environment, you'll need to install\n", + "`torch` and `torchvision` and restart the notebook." ] }, { "cell_type": "code", "execution_count": null, - "id": "065e7765", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "import matplotlib.pyplot as plt\n", - "import numpy as np\n", - "import tempfile\n", - "from typing import Dict\n", - "\n", - "import ray\n" + "!pip install torch torchvision" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "0edfc6e2", "metadata": {}, "source": [ - "## Load the dataset" + "## Step 1: Reading the Dataset from S3" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "3b6f2352", "metadata": {}, "source": [ - "> ✂️ Replace this function with logic to load your own data with Ray Data.\n", - ">\n", - "> See [the Ray Data guide on creating datasets](https://docs.ray.io/en/latest/data/creating-datasets.html) to learn how to create a dataset based on the data type and how file storage format." + "[Imagenette](https://github.com/fastai/imagenette) is a subset of Imagenet with 10 classes. We have this dataset hosted publicly in an S3 bucket. Since we are only doing inference here, we load in just the validation split.\n", + "\n", + "Here, we use [`ray.data.read_images`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_images.html) to load the validation set from S3. Ray Data also supports reading from a variety of other [datasources and formats](https://docs.ray.io/en/latest/data/loading-data.html)." ] }, { "cell_type": "code", "execution_count": null, - "id": "615f4a78", - "metadata": {}, + "metadata": { + "colab": { + "referenced_widgets": [ + "217255c5a2ba4ec5890f6f3667f5b429" + ] + }, + "id": "6i15qjnH0hin", + "outputId": "c22aaba0-b33a-40f5-cf89-a70847098af2", + "tags": [] + }, "outputs": [], "source": [ - "def load_ray_dataset():\n", - " from ray.data.datasource.partitioning import Partitioning\n", + "import ray\n", + "\n", + "s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/train/\"\n", "\n", - " s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/val/\"\n", - " partitioning = Partitioning(\"dir\", field_names=[\"class\"], base_dir=s3_uri)\n", - " ds = ray.data.read_images(\n", - " s3_uri, size=(256, 256), partitioning=partitioning, mode=\"RGB\"\n", - " )\n", - " return ds\n" + "ds = ray.data.read_images(s3_uri, mode=\"RGB\")\n", + "ds\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Inspecting the schema, we can see that there is 1 column in the dataset containing the images stored as Numpy arrays." ] }, { "cell_type": "code", "execution_count": null, - "id": "966bcfdc", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "ds = load_ray_dataset()\n" + "ds.schema()\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Inference on a single batch" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "1f183c51", "metadata": {}, "source": [ - "Let's inspect the first few images of our dataset. We'll use a pre-trained ResNet model\n", - "to classify these images." + "Next, we can do inference on a single batch of data, using a pre-trained ResNet152 model and following [this PyTorch example](https://pytorch.org/vision/main/models.html#classification).\n", + "\n", + "Let’s get a batch of 10 from our dataset. Each image in the batch is represented as a Numpy array." ] }, { "cell_type": "code", "execution_count": null, - "id": "7f51ee52", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "sample_images = [sample[\"image\"] for sample in ds.take(5)]\n", - "\n", - "_, axs = plt.subplots(1, 5, figsize=(10, 5))\n", - "\n", - "for i, image in enumerate(sample_images):\n", - " axs[i].imshow(image)\n", - " axs[i].axis(\"off\")\n" + "single_batch = ds.take_batch(10)\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "a7671aa0", "metadata": {}, "source": [ - "## Preprocess the dataset\n", + "We can visualize 1 image from this batch." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from PIL import Image\n", "\n", - "We may need to preprocess the dataset before passing it to the model.\n", - "This just amounts to writing a function that performs the preprocessing logic, and then\n", - "applying the function to the entire dataset with a call to `map_batches`." + "img = Image.fromarray(single_batch[\"image\"][0])\n", + "img\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "39d01e3c", "metadata": {}, "source": [ - "> ✂️ Replace this function with your own data preprocessing logic." + "Now, let’s download a pre-trained PyTorch Resnet model and get the required preprocessing transforms to preprocess the images prior to prediction." ] }, { "cell_type": "code", "execution_count": null, - "id": "652121bd", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "def preprocess(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:\n", - " import torch\n", - " from torchvision import transforms\n", + "from torchvision.models import ResNet152_Weights\n", + "from torchvision import transforms\n", + "from torchvision import models\n", + "\n", + "weights = ResNet152_Weights.IMAGENET1K_V1\n", "\n", - " def to_tensor(batch: np.ndarray) -> torch.Tensor:\n", - " tensor = torch.as_tensor(batch, dtype=torch.float)\n", - " # (B, H, W, C) -> (B, C, H, W)\n", - " tensor = tensor.permute(0, 3, 1, 2).contiguous()\n", - " # [0., 255.] -> [0., 1.]\n", - " tensor = tensor.div(255)\n", - " return tensor\n", + "# Load the pretrained resnet model and move to GPU.\n", + "# Remove the `.cuda()` if using CPU.\n", + "model = models.resnet152(weights=weights).cuda()\n", + "model.eval()\n", "\n", - " transform = transforms.Compose(\n", - " [\n", - " transforms.Lambda(to_tensor),\n", - " transforms.CenterCrop(224),\n", - " transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),\n", - " ]\n", - " )\n", - " return {\"image\": transform(batch[\"image\"]).numpy()}\n" + "imagenet_transforms = weights.transforms\n", + "transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then, we apply the transforms to our batch of images, and pass the batch to the model for inference, making sure to use the GPU device for inference.\n", + "\n", + "All of the images in the batch have been correctly classified as \"tench\" which is a type of fish." ] }, { "cell_type": "code", "execution_count": null, - "id": "c35f5a17", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "ds = ds.map_batches(preprocess, batch_format=\"numpy\")\n", + "import torch\n", + "\n", + "transformed_batch = [transform(image) for image in single_batch[\"image\"]]\n", + "with torch.inference_mode():\n", + " # Remove the `.cuda()` if doing inference on CPUs.\n", + " prediction_results = model(torch.stack(transformed_batch).cuda())\n", + " classes = prediction_results.argmax(dim=1).cpu()\n", "\n", - "print(\"Dataset schema:\\n\", ds.schema())\n", - "print(\"Number of images:\", ds.count())\n" + "del model # Free up GPU memory\n", + "\n", + "labels = [weights.meta[\"categories\"][i] for i in classes]\n", + "labels\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "eaa3653b", "metadata": {}, "source": [ - "## Set up your model for inference\n", - "\n", - "Define a class that loads the model on initialization, and also performs inference with the loaded model whenever the class is called (by implementing `__call__`)." + "## Step 3: Scaling up to the full Dataset with Ray Data" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "ad059e54", "metadata": {}, "source": [ - "> ✂️ Replace parts of this callable class with your own model initialization and inference logic." + "By using Ray Data, we can apply the same logic in the previous section to scale up to the entire dataset, leveraging all the GPUs in our cluster." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Preprocessing" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First let's convert the preprocessing code to Ray Data. We'll package the preprocessing code within a `preprocess_image` function. This function should take only one argument, which is a dict that contains a single image in the dataset, represented as a numpy array. We use the same `transform` function that was defined above and store the transformed image in a new `transformed_image` field." ] }, { "cell_type": "code", "execution_count": null, - "id": "42cac828", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "class PredictCallable:\n", - " def __init__(self):\n", - " # \n", - " import torch\n", - " from torchvision import models\n", - " from torchvision.models import ResNet152_Weights\n", - "\n", - " self.model = models.resnet152(weights=ResNet152_Weights.DEFAULT)\n", - " self.model.eval()\n", - " self.device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", - " self.model.to(self.device)\n", + "from typing import Any, Dict\n", "\n", - " def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:\n", - " # \n", - " import torch\n", "\n", - " input_data = torch.as_tensor(batch[\"image\"], device=self.device)\n", - " with torch.inference_mode():\n", - " pred = self.model(input_data)\n", - " return {\"predicted_class_index\": pred.argmax(dim=1).detach().cpu().numpy()}\n" + "def preprocess_image(row: Dict[str, Any]):\n", + " return {\n", + " \"original_image\": row[\"image\"],\n", + " \"transformed_image\": transform(row[\"image\"]),\n", + " }\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "2d63f352", "metadata": {}, "source": [ - "## Run batch inference\n", - "\n", - "We'll first configure the number of workers and the resource requirements of each worker.\n", + "Then we use the [`map`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map.html) API to apply the preprocessing to the whole dataset. By using Ray Data's map, we can scale out the preprocessing to all the resources in our Ray cluster.\n", "\n", - "These defaults will assume that your cluster has 4 GPUs available.\n", - "Be sure to stay within the resource constraints of your Ray Cluster if autoscaling is not enabled.\n" + "Note: the `map` method is lazy, it won't perform execution until we consume the results." ] }, { "cell_type": "code", "execution_count": null, - "id": "9d49681f-baf0-4ed8-9740-5c4e38744311", "metadata": { "tags": [] }, "outputs": [], "source": [ - "NUM_WORKERS: int = 4\n", - "USE_GPU: bool = True\n" + "transformed_ds = ds.map(preprocess_image)\n" ] }, { - "cell_type": "code", - "execution_count": null, - "id": "419658c0", + "attachments": {}, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "if USE_GPU and NUM_WORKERS > ray.available_resources()[\"GPU\"]:\n", - " print(\n", - " \"Your cluster does not currently have enough resources to run with these settings. \"\n", - " \"Consider decreasing the number of workers, decreasing the resources needed \"\n", - " \"per worker, or restarting the cluster with more GPU nodes.\"\n", - " \"Ignore this if your cluster auto-scales.\"\n", - " )\n" + "### Model Inference" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "3d170d2b", "metadata": {}, "source": [ - "You can check the available resources in your Ray Cluster with:" + "Next, let's convert the model inference part. Compared with preprocessing, model inference has 2 differences:\n", + "1. Model loading and initialization is usually expensive. \n", + "2. Model inference can be optimized with hardware acceleration if we process data in batches. Using larger batches improves GPU utilization and the overall runtime of the inference job.\n", + "\n", + "Thus, we convert the model inference code to the following `ResnetModel` class. In this class, we put the expensive model loading and initialization code in the `__init__` constructor, which will run only once. And we put the model inference code in the `__call__` method, which will be called for each batch.\n", + "\n", + "The `__call__` method takes a batch of data items, instead of a single one. In this case, the batch is also a dict that has one key named \"image\", and the value is a Numpy array of images represented in `np.ndarray` format. We reuse the same inferencing logic from step 2." ] }, { "cell_type": "code", "execution_count": null, - "id": "f66e12d1", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "!ray status" + "from typing import Dict\n", + "import numpy as np\n", + "import torch\n", + "\n", + "\n", + "class ResnetModel:\n", + " def __init__(self):\n", + " self.weights = ResNet152_Weights.IMAGENET1K_V1\n", + " self.model = models.resnet152(weights=self.weights).cuda()\n", + " self.model.eval()\n", + "\n", + " def __call__(self, batch: Dict[str, np.ndarray]):\n", + " # Convert the numpy array of images into a PyTorch tensor.\n", + " torch_batch = torch.from_numpy(batch[\"transformed_image\"])\n", + " # Move the tensor batch to GPU if available.\n", + " if torch.cuda.is_available():\n", + " torch_batch = torch_batch.cuda()\n", + " with torch.inference_mode():\n", + " prediction = self.model(torch_batch)\n", + " predicted_classes = prediction.argmax(dim=1).detach().cpu()\n", + " predicted_labels = [\n", + " self.weights.meta[\"categories\"][i] for i in predicted_classes\n", + " ]\n", + " return {\n", + " \"predicted_label\": predicted_labels,\n", + " \"original_image\": batch[\"original_image\"],\n", + " }\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "89dff216", "metadata": {}, "source": [ - "Now, use Ray Data to perform batch inference using `NUM_WORKERS` copies of the `PredictCallable` class you defined." + "Then we use the [`map_batches`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html) API to apply the model to the whole dataset.\n", + "\n", + "The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/actors.html). For class-based UDFs, we use the `compute` argument to specify [`ActorPoolStrategy`](https://docs.ray.io/en/latest/data/api/doc/ray.data.ActorPoolStrategy.html) with the number of parallel actors.\n", + "\n", + "The `batch_size` argument indicates the number of images in each batch. See the Ray dashboard\n", + "for GPU memory usage to experiment with the `batch_size` when using your own model and dataset.\n", + "\n", + "The `num_gpus` argument specifies the number of GPUs needed for each `ResnetModel` instance. In this case, we want 1 GPU for each model replica. If you are doing CPU inference, you can remove the `num_gpus=1`." ] }, { "cell_type": "code", "execution_count": null, - "id": "331e21e4", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "predictions = ds.map_batches(\n", - " PredictCallable,\n", - " batch_size=128,\n", - " compute=ray.data.ActorPoolStrategy(size=NUM_WORKERS),\n", - " num_gpus=1 if USE_GPU else 0,\n", - " batch_format=\"numpy\",\n", - ")\n", - "\n", - "preds = predictions.materialize()\n" + "predictions = transformed_ds.map_batches(\n", + " ResnetModel,\n", + " compute=ray.data.ActorPoolStrategy(\n", + " size=4\n", + " ), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n", + " num_gpus=1, # Specify 1 GPU per model replica.\n", + " batch_size=720, # Use the largest batch size that can fit on our GPUs\n", + ")\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "05c48946", "metadata": {}, "source": [ - "## View the predictions" + "### Verify and Save Results" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "2565ba08", "metadata": {}, "source": [ - "Show the first few predictions, which will show the predicted class labels of the images shown earlier! These first few predictions should show index 0, which maps to the class label `\"tench\"` (a type of fish)." + "Let's take a small batch of predictions and verify the results." ] }, { "cell_type": "code", "execution_count": null, - "id": "8d606556", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "preds.take(5)\n" + "prediction_batch = predictions.take_batch(5)" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "90ec67e8", "metadata": {}, "source": [ - "Shard the predictions into a few partitions, and save each partition to a file.\n", - "\n", - "This currently saves to the local filesystem under a temporary directory, but you could also save to a cloud bucket (e.g., `s3://predictions-bucket`)." + "We see that all the images are correctly classified as \"tench\", which is a type of fish." ] }, { "cell_type": "code", "execution_count": null, - "id": "c1887e34", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "num_shards = 3\n", - "\n", - "temp_dir = tempfile.mkdtemp()\n", + "from PIL import Image\n", "\n", - "# The `local://` prefix is need to make sure all results get written on the head node.\n", - "predictions.repartition(num_shards).write_parquet(f\"local://{temp_dir}\")\n", - "print(f\"Predictions saved to `{temp_dir}`!\")\n" + "for image, prediction in zip(\n", + " prediction_batch[\"original_image\"], prediction_batch[\"predicted_label\"]\n", + "):\n", + " img = Image.fromarray(image)\n", + " display(img)\n", + " print(\"Label: \", prediction)\n" ] }, { "attachments": {}, "cell_type": "markdown", - "id": "3b7b5d91", - "metadata": {}, + "metadata": { + "tags": [] + }, "source": [ - "## Summary\n", - "\n", - "This template used [Ray Data](https://docs.ray.io/en/latest/data/dataset.html) to scale out batch inference. Ray Data is one of many libraries under the [Ray AI Runtime](https://docs.ray.io/en/latest/ray-air/getting-started.html). See [this blog post](https://www.anyscale.com/blog/model-batch-inference-in-ray-actors-actorpool-and-datasets) for more details on batch inference with Ray!\n", - "\n", - "At a high level, this template showed how to:\n", - "1. [Load your dataset using Ray Data.](https://docs.ray.io/en/latest/data/loading-data.html)\n", - "2. [Preprocess your dataset before feeding it to your model.](https://docs.ray.io/en/latest/data/transforming-data.html)\n", - "3. [Initialize your model and perform inference on a shard of your dataset with a remote actor.](https://docs.ray.io/en/latest/data/transforming-data.html#reduce-setup-overheads-using-actors)\n", - "4. [Save your prediction results.](https://docs.ray.io/en/latest/data/api/input_output.html)\n", - "\n" + "If the samples look good, we can proceed with saving the results to an external storage, e.g., S3 or local disks. See [the guide on saving data](https://docs.ray.io/en/latest/data/saving-data.html) for all supported storage and file formats.\n" ] }, { - "attachments": {}, - "cell_type": "markdown", - "id": "d1658235", + "cell_type": "code", + "execution_count": null, "metadata": {}, - "source": [] + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "temp_dir = tempfile.mkdtemp()\n", + "\n", + "# Don't save the images as part of the predictions.\n", + "predictions = predictions.drop_columns([\"original_image\"])\n", + "\n", + "# The `local://` prefix is need to make sure all results get written on the head node.\n", + "predictions.write_parquet(f\"local://{temp_dir}\")\n", + "print(f\"Predictions saved to `{temp_dir}`!\")\n" + ] } ], "metadata": { + "colab": { + "provenance": [] + }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", @@ -412,14 +490,14 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.13" + "version": "3.10.8" }, "vscode": { "interpreter": { - "hash": "265d195fda5292fe8f69c6e37c435a5634a1ed3b6799724e66a975f68fa21517" + "hash": "a8c1140d108077f4faeb76b2438f85e4ed675f93d004359552883616a1acd54c" } } }, "nbformat": 4, - "nbformat_minor": 5 + "nbformat_minor": 4 }