Skip to content

Commit

Permalink
[Templates] Unify the batch inference template with an existing Data …
Browse files Browse the repository at this point in the history
…example (#36401)

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
  • Loading branch information
justinvyu committed Jun 15, 2023
1 parent dbfe420 commit 797bd4a
Show file tree
Hide file tree
Showing 4 changed files with 354 additions and 231 deletions.
4 changes: 2 additions & 2 deletions doc/source/data/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ Computer Vision

.. button-ref:: examples/pytorch_resnet_batch_prediction

Image Classification Batch Inference with PyTorch ResNet18
Image Classification Batch Inference with PyTorch ResNet152



.. grid-item-card::

.. button-ref:: examples/batch_inference_object_detection
Expand Down
131 changes: 88 additions & 43 deletions doc/source/data/examples/pytorch_resnet_batch_prediction.ipynb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "MNwt9bSG0hin"
Expand All @@ -20,6 +21,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand All @@ -38,13 +40,15 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 1: Reading the Dataset from S3"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -102,15 +106,15 @@
],
"source": [
"import ray\n",
"from ray.data.datasource.partitioning import Partitioning\n",
"\n",
"s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/val/\"\n",
"s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/train/\"\n",
"\n",
"ds = ray.data.read_images(s3_uri, mode=\"RGB\")\n",
"ds\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -138,17 +142,19 @@
}
],
"source": [
"ds.schema()"
"ds.schema()\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 2: Inference on a single batch"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand All @@ -165,10 +171,11 @@
},
"outputs": [],
"source": [
"single_batch = ds.take_batch(10)"
"single_batch = ds.take_batch(10)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -198,10 +205,11 @@
"from PIL import Image\n",
"\n",
"img = Image.fromarray(single_batch[\"image\"][0])\n",
"img"
"img\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand All @@ -216,22 +224,23 @@
},
"outputs": [],
"source": [
"from torchvision.models import ResNet18_Weights\n",
"from torchvision.models import ResNet152_Weights\n",
"from torchvision import transforms\n",
"from torchvision import models\n",
"\n",
"weights = ResNet18_Weights.IMAGENET1K_V1\n",
"weights = ResNet152_Weights.IMAGENET1K_V1\n",
"\n",
"# Load the pretrained resnet model and move to GPU.\n",
"# Remove the `.cuda()` if using CPU.\n",
"model = models.resnet18(weights=weights).cuda()\n",
"model = models.resnet152(weights=weights).cuda()\n",
"model.eval()\n",
"\n",
"imagenet_transforms = weights.transforms\n",
"transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])"
"transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -276,32 +285,38 @@
" prediction_results = model(torch.stack(transformed_batch).cuda())\n",
" classes = prediction_results.argmax(dim=1).cpu()\n",
"\n",
"del model # Free up GPU memory\n",
"\n",
"labels = [weights.meta[\"categories\"][i] for i in classes]\n",
"labels"
"labels\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3: Scaling up to the full Dataset with Ray Data"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"By using Ray Data, we can apply the same logic in the previous section to scale up to the entire dataset, leveraging all the GPUs in our cluster."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Preprocessing"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand All @@ -318,51 +333,51 @@
"source": [
"from typing import Any, Dict\n",
"\n",
"\n",
"def preprocess_image(row: Dict[str, Any]):\n",
" return {\"original_image\": row[\"image\"], \n",
" \"transformed_image\": transform(row[\"image\"]),}"
" return {\n",
" \"original_image\": row[\"image\"],\n",
" \"transformed_image\": transform(row[\"image\"]),\n",
" }\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Then we use the {meth}`map <ds.data.Dataset.map>` API to apply the function to the whole dataset. By using Ray Data's map, we can scale out the preprocessing to all the resources in our Ray cluster Note, the `map` method is lazy, it won't perform execution until we start to consume the results."
"Then we use the {meth}`map <ds.data.Dataset.map>` API to apply the function to the whole dataset. By using Ray Data's map, we can scale out the preprocessing to all the resources in our Ray cluster.\n",
"\n",
"Note: the `map` method is lazy, it won't perform execution until we consume the results."
]
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[2023-05-23 18:50:07] [Ray Data] WARNING ray.data.dataset::The `map`, `flat_map`, and `filter` operations are unvectorized and can be very slow. If you're using a vectorized transformation, consider using `.map_batches()` instead.\n"
]
}
],
"outputs": [],
"source": [
"transformed_ds = ds.map(preprocess_image)"
"transformed_ds = ds.map(preprocess_image)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Model Inference"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, let's convert the model inference part. Compared with preprocessing, model inference has 2 differences:\n",
"1. Model loading and initialization is usually expensive. \n",
"1. Model inference can be optimized with hardware acceleration if we process data in batches. Using larger batches improves GPU utilization and the overall runtime of the inference job.\n",
"2. Model inference can be optimized with hardware acceleration if we process data in batches. Using larger batches improves GPU utilization and the overall runtime of the inference job.\n",
"\n",
"Thus, we convert the model inference code to the following `ResnetModel` class. In this class, we put the expensive model loading and initialization code in the `__init__` constructor, which will run only once. And we put the model inference code in the `__call__` method, which will be called for each batch.\n",
"\n",
Expand All @@ -381,10 +396,11 @@
"import numpy as np\n",
"import torch\n",
"\n",
"\n",
"class ResnetModel:\n",
" def __init__(self):\n",
" self.weights = ResNet18_Weights.IMAGENET1K_V1\n",
" self.model = models.resnet18(self.weights).cuda()\n",
" self.weights = ResNet152_Weights.IMAGENET1K_V1\n",
" self.model = models.resnet152(weights=self.weights).cuda()\n",
" self.model.eval()\n",
"\n",
" def __call__(self, batch: Dict[str, np.ndarray]):\n",
Expand All @@ -396,20 +412,27 @@
" with torch.inference_mode():\n",
" prediction = self.model(torch_batch)\n",
" predicted_classes = prediction.argmax(dim=1).detach().cpu()\n",
" predicted_labels = [self.weights.meta[\"categories\"][i] for i in predicted_classes]\n",
" return {\"predicted_label\": predicted_labels, \"original_image\": batch[\"original_image\"]}"
" predicted_labels = [\n",
" self.weights.meta[\"categories\"][i] for i in predicted_classes\n",
" ]\n",
" return {\n",
" \"predicted_label\": predicted_labels,\n",
" \"original_image\": batch[\"original_image\"],\n",
" }\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Then we use the {meth}`map_batches <ray.data.Dataset.map_batches>` API to apply the model to the whole dataset. \n",
"Then we use the {meth}`map <ds.data.Dataset.map>` API to apply the model to the whole dataset.\n",
"\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](actor-guide). For class-based UDFs, we use the `compute` argument to specify {class}`ActorPoolStrategy <ray.data.ActorPoolStrategy>` with the number of parallel actors.\n",
"\n",
"The first parameter of {meth}`map_batches <ray.data.Dataset.map_batches>` is the user-defined function (UDF), which can either be a function or a class. In this case, we use a class-based UDF which is run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). For class-based UDFs, we use the `compute` argument to specify {class}`ActorPoolStrategy <ray.data.dataset_internal.compute.ActorPoolStrategy>` with the number of parallel actors. \n",
"The `batch_size` argument indicates the number of images in each batch. See the Ray dashboard\n",
"for GPU memory usage to experiment with the `batch_size` when using your own model and dataset.\n",
"\n",
"The `batch_size` argument indicates the number of images in each batch.\n",
"The `num_gpus` argument specifies the number of GPUs needed for each `ResnetModel` instance. In this case, we want 1 GPU for each model replica. If you are doing CPU inference, you can remove the `num_gpus=1`."
]
},
Expand All @@ -423,24 +446,28 @@
"source": [
"predictions = transformed_ds.map_batches(\n",
" ResnetModel,\n",
" compute=ray.data.ActorPoolStrategy(size=4), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" compute=ray.data.ActorPoolStrategy(\n",
" size=4\n",
" ), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1, # Specify 1 GPU per model replica.\n",
" batch_size=1024 # Use the largest batch size that can fit on our GPUs\n",
")"
" batch_size=720, # Use the largest batch size that can fit on our GPUs\n",
")\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Verify and Save Results"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's take a small batch and verify the results."
"Let's take a small batch of predictions and verify the results."
]
},
{
Expand All @@ -451,10 +478,11 @@
},
"outputs": [],
"source": [
"prediction_batch = predictions.take_batch(5)"
"prediction_batch = predictions.take_batch(5)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -557,23 +585,40 @@
"source": [
"from PIL import Image\n",
"\n",
"for image, prediction in zip(prediction_batch[\"original_image\"], prediction_batch[\"predicted_label\"]):\n",
"for image, prediction in zip(\n",
" prediction_batch[\"original_image\"], prediction_batch[\"predicted_label\"]\n",
"):\n",
" img = Image.fromarray(image)\n",
" display(img)\n",
" print(\"Label: \", prediction)"
" print(\"Label: \", prediction)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"If the samples look good, we can proceed with saving the results to an external storage, e.g., S3 or local disks. See [Ray Data Input/Output](https://docs.ray.io/en/latest/data/api/input_output.html) for all supported stoarges and file formats.\n",
"If the samples look good, we can proceed with saving the results to an external storage, e.g., S3 or local disks. See [the guide on saving data](https://docs.ray.io/en/latest/data/saving-data.html) for all supported storage and file formats.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tempfile\n",
"\n",
"temp_dir = tempfile.mkdtemp()\n",
"\n",
"# Don't save the images as part of the predictions.\n",
"predictions = predictions.drop_columns([\"original_image\"])\n",
"\n",
"```python\n",
"ds.write_parquet(\"local://tmp/inference_results\")\n",
"```"
"# The `local://` prefix is need to make sure all results get written on the head node.\n",
"predictions.write_parquet(f\"local://{temp_dir}\")\n",
"print(f\"Predictions saved to `{temp_dir}`!\")\n"
]
}
],
Expand Down
Loading

0 comments on commit 797bd4a

Please sign in to comment.