Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cuvs acceleration for gpu k-means #2816

Merged
merged 33 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
893088e
Accelerate k-means with cuvs
jacketsj Aug 31, 2024
9907154
Accelerate k-means with cuvs
jacketsj Sep 4, 2024
2e9a064
Clean up cuvs code, add it as a dependency
jacketsj Aug 31, 2024
d3a9583
Add time logging, and clean up some param choices
jacketsj Aug 31, 2024
0795607
Change use_cuvs to default=False and use logging.info for logging
jacketsj Aug 31, 2024
d2f1b36
Remove 'time_init'
jacketsj Aug 31, 2024
498fe00
Autoformat
jacketsj Sep 4, 2024
95674fc
Revert batch size
jacketsj Aug 31, 2024
0825198
Increase itopk_size over time, up to a reasonable limit
jacketsj Aug 31, 2024
4412cb9
Adjust import methodology
jacketsj Aug 31, 2024
7d66605
Simplify imports
jacketsj Aug 31, 2024
5d59807
Run autoformatter
jacketsj Aug 31, 2024
a71879c
More formatting
jacketsj Aug 31, 2024
2318b0b
Remove f strings in logging statements
jacketsj Aug 31, 2024
e74aa83
Split line
jacketsj Aug 31, 2024
f9b1a8c
Update based on linter
jacketsj Aug 31, 2024
a33cdaf
Run correct version of autoformatter
jacketsj Aug 31, 2024
109f844
subimports for cuvs and pylibraft
jacketsj Aug 31, 2024
33c8678
Add cuvs and pylibraft to full dependencies as a temporary measure
jacketsj Aug 31, 2024
5ebf485
Sort import block
jacketsj Aug 31, 2024
7919477
Clean up commented code
jacketsj Sep 4, 2024
58c5c98
Warnings -> Errors
jacketsj Sep 5, 2024
9cc06af
Move modified kmeans module to cuvs/kmeans.py
jacketsj Sep 5, 2024
4c5eb55
Setup multiple optional cuvs dependencies for different python versions
jacketsj Sep 5, 2024
a6647b1
Fix imports, use cagra and device_ndarray directly
jacketsj Sep 6, 2024
549ad27
Add missing cuvs module init file
jacketsj Sep 6, 2024
d035431
Integrate cuvs kmeans into training/assignments for ivf
jacketsj Sep 6, 2024
75744c3
Run linter
jacketsj Sep 6, 2024
15c243a
Move import check to top of kmeans.py
jacketsj Sep 6, 2024
28259aa
Seemingly finally fix optional submodule dependencies
jacketsj Sep 7, 2024
16e81c3
Run ruff fixes
jacketsj Sep 7, 2024
a6f129f
Add missing license header
jacketsj Sep 7, 2024
48ce370
Merge branch 'main' into jack/cuvs-accel
jacketsj Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ tests = [
dev = ["ruff==0.4.1"]
benchmarks = ["pytest-benchmark"]
torch = ["torch"]
cuvs-py39 = [
"cuvs-cu12 @ https://pypi.nvidia.com/cuvs-cu12/cuvs_cu12-24.8.0-cp39-cp39-manylinux_2_28_x86_64.whl",
"pylibraft-cu12 @ https://pypi.nvidia.com/pylibraft-cu12/pylibraft_cu12-24.8.1-cp39-cp39-manylinux_2_28_x86_64.whl"
]

cuvs-py310 = [
"cuvs-cu12 @ https://pypi.nvidia.com/cuvs-cu12/cuvs_cu12-24.8.0-cp310-cp310-manylinux_2_28_x86_64.whl",
"pylibraft-cu12 @ https://pypi.nvidia.com/pylibraft-cu12/pylibraft_cu12-24.8.1-cp310-cp310-manylinux_2_28_x86_64.whl"
]

cuvs-py311 = [
"cuvs-cu12 @ https://pypi.nvidia.com/cuvs-cu12/cuvs_cu12-24.8.0-cp311-cp311-manylinux_2_28_x86_64.whl",
"pylibraft-cu12 @ https://pypi.nvidia.com/pylibraft-cu12/pylibraft_cu12-24.8.1-cp311-cp311-manylinux_2_28_x86_64.whl"
]
ray = ["ray[data]; python_version<'3.12'"]

[tool.ruff]
Expand Down
2 changes: 2 additions & 0 deletions python/python/lance/cuvs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
143 changes: 143 additions & 0 deletions python/python/lance/cuvs/kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors


import logging
import time
from typing import Literal, Optional, Tuple, Union

import pyarrow as pa

from lance.dependencies import cagra, raft_common, torch
from lance.dependencies import numpy as np
from lance.torch.kmeans import KMeans as KMeansTorch

__all__ = ["KMeans"]


class KMeans(KMeansTorch):
"""K-Means trains over vectors and divide into K clusters,
using cuVS as accelerator.

This implement is built on PyTorch+cuVS, supporting Nvidia GPU only.

Parameters
----------
k: int
The number of clusters
metric : str
Metric type, support "l2", "cosine" or "dot"
init: str
Initialization method. Only support "random" now.
max_iters: int
Max number of iterations to train the kmean model.
tolerance: float
Relative tolerance in regard to Frobenius norm of the difference in
the cluster centers of two consecutive iterations to declare convergence.
centroids : torch.Tensor, optional.
Provide existing centroids.
seed: int, optional
Random seed
device: str, optional
The device to run the PyTorch algorithms. Default we will pick
the most performant device on the host. See `lance.torch.preferred_device()`
For the cuVS implementation, it will be verified this is a cuda device.
"""

def __init__(
self,
k: int,
*,
metric: Literal["l2", "euclidean", "cosine", "dot"] = "l2",
init: Literal["random"] = "random",
max_iters: int = 50,
tolerance: float = 1e-4,
centroids: Optional[torch.Tensor] = None,
seed: Optional[int] = None,
device: Optional[str] = None,
itopk_size: int = 10,
):
if metric == "dot":
raise ValueError(
'Kmeans::__init__: metric == "dot" is incompatible' " with cuVS"
)
super().__init__(
k,
metric=metric,
init=init,
max_iters=max_iters,
tolerance=tolerance,
centroids=centroids,
seed=seed,
device=device,
)

if self.device.type != "cuda" or not torch.cuda.is_available():
raise ValueError("KMeans::__init__: cuda is not enabled/available")

self.itopk_size = itopk_size
self.time_rebuild = 0.0
self.time_search = 0.0

def fit(
self,
data: Union[
torch.utils.data.IterableDataset,
np.ndarray,
torch.Tensor,
pa.FixedSizeListArray,
],
) -> None:
self.time_rebuild = 0.0
self.time_search = 0.0
super().fit(data)
logging.info("Total search time: %s", self.time_search)
logging.info("Total rebuild time: %s", self.time_rebuild)

def rebuild_index(self):
rebuild_time_start = time.time()
cagra_metric = "sqeuclidean"
dim = self.centroids.shape[1]
graph_degree = max(dim // 4, 32)
nn_descent_degree = graph_degree * 2
index_params = cagra.IndexParams(
metric=cagra_metric,
intermediate_graph_degree=nn_descent_degree,
graph_degree=graph_degree,
build_algo="nn_descent",
compression=None,
)
self.index = cagra.build(index_params, self.centroids)
rebuild_time_end = time.time()
self.time_rebuild += rebuild_time_end - rebuild_time_start

self.y2 = None

def _transform(
self,
data: torch.Tensor,
y2: Optional[torch.Tensor] = None,
) -> Tuple[torch.Tensor, torch.Tensor]:
if self.metric == "cosine":
data = torch.nn.functional.normalize(data)

search_time_start = time.time()
device = torch.device("cuda")
out_idx = raft_common.device_ndarray.empty((data.shape[0], 1), dtype="uint32")
out_dist = raft_common.device_ndarray.empty((data.shape[0], 1), dtype="float32")
search_params = cagra.SearchParams(itopk_size=self.itopk_size)
cagra.search(
search_params,
self.index,
data,
1,
neighbors=out_idx,
distances=out_dist,
)
ret = (
torch.as_tensor(out_idx, device=device).squeeze(dim=1).view(torch.int32),
torch.as_tensor(out_dist, device=device),
)
search_time_end = time.time()
self.time_search += search_time_end - search_time_start
return ret
10 changes: 10 additions & 0 deletions python/python/lance/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
_PANDAS_AVAILABLE = True
_POLARS_AVAILABLE = True
_TORCH_AVAILABLE = True
_CAGRA_AVAILABLE = True
_RAFT_COMMON_AVAILABLE = True
_HUGGING_FACE_AVAILABLE = True
_TENSORFLOW_AVAILABLE = True
_RAY_AVAILABLE = True
Expand All @@ -48,6 +50,8 @@ class _LazyModule(ModuleType):
"pandas": "pd.",
"polars": "pl.",
"torch": "torch.",
"cagra": "cagra.",
"common": "raft_common.",
"tensorflow": "tf.",
"ray": "ray.",
}
Expand Down Expand Up @@ -172,6 +176,8 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
pandas, _PANDAS_AVAILABLE = _lazy_import("pandas")
polars, _POLARS_AVAILABLE = _lazy_import("polars")
torch, _TORCH_AVAILABLE = _lazy_import("torch")
cagra, _CAGRA_AVAILABLE = _lazy_import("cuvs.neighbors.cagra")
raft_common, _RAFT_COMMON_AVAILABLE = _lazy_import("pylibraft.common")
datasets, _HUGGING_FACE_AVAILABLE = _lazy_import("datasets")
tensorflow, _TENSORFLOW_AVAILABLE = _lazy_import("tensorflow")
ray, _RAY_AVAILABLE = _lazy_import("ray")
Expand Down Expand Up @@ -238,6 +244,8 @@ def _check_for_ray(obj: Any, *, check_type: bool = True) -> bool:
"ray",
"tensorflow",
"torch",
"cagra",
"raft_common",
# lazy utilities
"_check_for_hugging_face",
"_check_for_numpy",
Expand All @@ -252,6 +260,8 @@ def _check_for_ray(obj: Any, *, check_type: bool = True) -> bool:
"_PANDAS_AVAILABLE",
"_POLARS_AVAILABLE",
"_TORCH_AVAILABLE",
"_CAGRA_AVAILABLE",
"_RAFT_COMMON_AVAILABLE",
"_HUGGING_FACE_AVAILABLE",
"_TENSORFLOW_AVAILABLE",
"_RAY_AVAILABLE",
Expand Down
15 changes: 12 additions & 3 deletions python/python/lance/torch/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import pyarrow as pa
from tqdm import tqdm

from lance.dependencies import _check_for_numpy, _check_for_torch, torch
from lance.dependencies import (
_check_for_numpy,
_check_for_torch,
torch,
)
from lance.dependencies import numpy as np

from . import preferred_device
Expand Down Expand Up @@ -79,6 +83,8 @@ def __init__(
self.tolerance = tolerance
self.seed = seed

self.y2 = None

def __repr__(self):
return f"KMeans(k={self.k}, metric={self.metric}, device={self.device})"

Expand Down Expand Up @@ -220,14 +226,14 @@ def _fit_once(
)
counts_per_part = torch.zeros(self.centroids.shape[0], device=self.device)
ones = torch.ones(1024 * 16, device=self.device)
y2 = (self.centroids * self.centroids).sum(dim=1)
self.rebuild_index()
for idx, chunk in enumerate(data):
if idx % 50 == 0:
logging.info("Kmeans::train: epoch %s, chunk %s", epoch, idx)
chunk: torch.Tensor = chunk
dtype = chunk.dtype
chunk = chunk.to(self.device)
ids, dists = self._transform(chunk, y2=y2)
ids, dists = self._transform(chunk, y2=self.y2)

valid_mask = ids >= 0
if torch.any(~valid_mask):
Expand Down Expand Up @@ -263,6 +269,9 @@ def _fit_once(
)
return total_dist

def rebuild_index(self):
self.y2 = (self.centroids * self.centroids).sum(dim=1)

def _transform(
self,
data: torch.Tensor,
Expand Down
35 changes: 26 additions & 9 deletions python/python/lance/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ def train_ivf_centroids_on_accelerator(
) -> (np.ndarray, str):
"""Use accelerator (GPU or MPS) to train kmeans."""
if isinstance(accelerator, str) and (
not (CUDA_REGEX.match(accelerator) or accelerator == "mps")
not (
CUDA_REGEX.match(accelerator)
or accelerator == "mps"
or accelerator == "cuvs"
)
):
raise ValueError(
"Train ivf centroids on accelerator: "
Expand Down Expand Up @@ -181,14 +185,27 @@ def train_ivf_centroids_on_accelerator(
cache=True,
)

logging.info("Training IVF partitions using GPU(%s)", accelerator)
kmeans = KMeans(
k,
max_iters=max_iters,
metric=metric_type,
device=accelerator,
centroids=init_centroids,
)
if accelerator == "cuvs":
logging.info("Training IVF partitions using cuVS+GPU")
print("Training IVF partitions using cuVS+GPU")
from lance.cuvs.kmeans import KMeans as KMeansCuVS

kmeans = KMeansCuVS(
k,
max_iters=max_iters,
metric=metric_type,
device="cuda",
centroids=init_centroids,
)
else:
logging.info("Training IVF partitions using GPU(%s)", accelerator)
kmeans = KMeans(
k,
max_iters=max_iters,
metric=metric_type,
device=accelerator,
centroids=init_centroids,
)
kmeans.fit(ds)

centroids = kmeans.centroids.cpu().numpy()
Expand Down