Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: lancedb/lance
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 8dd5819e562b85f5fafd76305c06558c50da989e
Choose a base ref
..
head repository: lancedb/lance
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 2c4966f76327e09f35faa768074025e9b3873e70
Choose a head ref
Showing with 2,660 additions and 98 deletions.
  1. +9 −0 protos/table.proto
  2. +9 −0 protos/transaction.proto
  3. +52 −0 python/python/benchmarks/test_index.py
  4. +63 −8 python/python/lance/dataset.py
  5. +199 −8 python/python/lance/vector.py
  6. +141 −0 python/python/tests/test_dataset.py
  7. +5 −1 python/src/dataset.rs
  8. +1 −1 python/src/file.rs
  9. +1 −1 rust/lance-datagen/src/generator.rs
  10. +1 −0 rust/lance-encoding/Cargo.toml
  11. +231 −3 rust/lance-encoding/src/data.rs
  12. +1 −1 rust/lance-encoding/src/decoder.rs
  13. +1 −1 rust/lance-encoding/src/encoder.rs
  14. +3 −1 rust/lance-encoding/src/encodings/logical/list.rs
  15. +5 −1 rust/lance-encoding/src/encodings/logical/primitive.rs
  16. +1 −1 rust/lance-encoding/src/encodings/logical/struct.rs
  17. +5 −1 rust/lance-encoding/src/encodings/physical/basic.rs
  18. +17 −1 rust/lance-encoding/src/encodings/physical/binary.rs
  19. +3 −1 rust/lance-encoding/src/encodings/physical/bitmap.rs
  20. +5 −1 rust/lance-encoding/src/encodings/physical/bitpack.rs
  21. +7 −0 rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs
  22. +3 −1 rust/lance-encoding/src/encodings/physical/block_compress.rs
  23. +8 −1 rust/lance-encoding/src/encodings/physical/dictionary.rs
  24. +5 −1 rust/lance-encoding/src/encodings/physical/fixed_size_binary.rs
  25. +9 −1 rust/lance-encoding/src/encodings/physical/fsst.rs
  26. +6 −0 rust/lance-encoding/src/encodings/physical/packed_struct.rs
  27. +3 −1 rust/lance-encoding/src/encodings/physical/value.rs
  28. +1 −0 rust/lance-encoding/src/lib.rs
  29. +1,252 −0 rust/lance-encoding/src/statistics.rs
  30. +1 −1 rust/lance-file/src/v2/writer.rs
  31. +2 −2 rust/lance-table/README.md
  32. +14 −3 rust/lance-table/src/feature_flags.rs
  33. +47 −0 rust/lance-table/src/format/manifest.rs
  34. +4 −0 rust/lance-table/src/io/manifest.rs
  35. +95 −1 rust/lance/src/dataset.rs
  36. +2 −0 rust/lance/src/dataset/fragment.rs
  37. +259 −36 rust/lance/src/dataset/transaction.rs
  38. +54 −12 rust/lance/src/index.rs
  39. +3 −2 rust/lance/src/index/vector/ivf.rs
  40. +121 −0 rust/lance/src/io/commit.rs
  41. +6 −4 rust/lance/src/io/exec/knn.rs
  42. +5 −1 rust/lance/src/utils/test.rs
9 changes: 9 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
@@ -85,6 +85,8 @@ message Manifest {
// * 1: deletion files are present
// * 2: move_stable_row_ids: row IDs are tracked and stable after move operations
// (such as compaction), but not updates.
// * 4: use v2 format (deprecated)
// * 8: table config is present
uint64 reader_feature_flags = 9;

// Feature flags for writers.
@@ -137,6 +139,13 @@ message Manifest {
//
// This specifies what format is used to store the data files.
DataStorageFormat data_format = 15;

// Table config.
//
// Keys with the prefix "lance." are reserved for the Lance library. Other
// libraries may wish to similarly prefix their configuration keys
// appropriately.
map<string, string> config = 16;
} // Manifest

// Auxiliary Data attached to a version.
9 changes: 9 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
@@ -61,6 +61,8 @@ message Transaction {
repeated lance.file.Field schema = 2;
// Schema metadata.
map<string, bytes> schema_metadata = 3;
// Key-value pairs to merge with existing config.
map<string, string> config_upsert_values = 4;
}

// Add or replace a new secondary index.
@@ -156,6 +158,12 @@ message Transaction {
// The new fragments where updated rows have been moved to.
repeated DataFragment new_fragments = 3;
}

// An operation that updates the table config.
message UpdateConfig {
map<string, string> upsert_values = 1;
repeated string delete_keys = 2;
}

// The operation of this transaction.
oneof operation {
@@ -169,5 +177,6 @@ message Transaction {
ReserveFragments reserve_fragments = 107;
Update update = 108;
Project project = 109;
UpdateConfig update_config = 110;
}
}
52 changes: 52 additions & 0 deletions python/python/benchmarks/test_index.py
Original file line number Diff line number Diff line change
@@ -54,6 +54,41 @@ def test_create_ivf_pq(test_dataset, benchmark):
)


@pytest.mark.benchmark(group="create_index")
def test_create_ivf_pq_torch_cpu(test_dataset, benchmark):
from lance.dependencies import torch

benchmark(
test_dataset.create_index,
column="vector",
index_type="IVF_PQ",
metric_type="L2",
num_partitions=8,
num_sub_vectors=2,
num_bits=8,
replace=True,
accelerator=torch.device("cpu"),
)


@pytest.mark.benchmark(group="create_index")
def test_create_ivf_pq_torch_cpu_one_pass(test_dataset, benchmark):
from lance.dependencies import torch

benchmark(
test_dataset.create_index,
column="vector",
index_type="IVF_PQ",
metric_type="L2",
num_partitions=8,
num_sub_vectors=2,
num_bits=8,
replace=True,
accelerator=torch.device("cpu"),
one_pass_ivfpq=True,
)


@pytest.mark.benchmark(group="create_index")
@pytest.mark.cuda
def test_create_ivf_pq_cuda(test_dataset, benchmark):
@@ -70,6 +105,23 @@ def test_create_ivf_pq_cuda(test_dataset, benchmark):
)


@pytest.mark.benchmark(group="create_index")
@pytest.mark.cuda
def test_create_ivf_pq_cuda_one_pass(test_dataset, benchmark):
benchmark(
test_dataset.create_index,
column="vector",
index_type="IVF_PQ",
metric_type="L2",
num_partitions=8,
num_sub_vectors=2,
num_bits=8,
accelerator="cuda",
replace=True,
one_pass_ivfpq=True,
)


@pytest.mark.benchmark(group="optimize_index")
@pytest.mark.parametrize("num_partitions", [256, 512])
@pytest.mark.parametrize("num_small_indexes", [5])
71 changes: 63 additions & 8 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
@@ -1448,6 +1448,7 @@ def create_index(
precomputed_partition_dataset: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
filter_nan: bool = True,
one_pass_ivfpq: bool = False,
**kwargs,
) -> LanceDataset:
"""Create index on column.
@@ -1508,6 +1509,8 @@ def create_index(
Defaults to True. False is UNSAFE, and will cause a crash if any null/nan
values are present (and otherwise will not). Disables the null filter used
for nullable columns. Obtains a small speed boost.
one_pass_ivfpq: bool
Defaults to False. If enabled, index type must be "IVF_PQ". Reduces disk IO.
kwargs :
Parameters passed to the index building process.
@@ -1631,6 +1634,58 @@ def create_index(
raise NotImplementedError(
f"Only {valid_index_types} index types supported. " f"Got {index_type}"
)
if index_type != "IVF_PQ" and one_pass_ivfpq:
raise ValueError(
f'one_pass_ivfpq requires index_type="IVF_PQ", got {index_type}'
)

# Handle timing for various parts of accelerated builds
timers = {}
if one_pass_ivfpq and accelerator is not None:
from .vector import (
one_pass_assign_ivf_pq_on_accelerator,
one_pass_train_ivf_pq_on_accelerator,
)

logging.info("Doing one-pass ivfpq accelerated computations")

timers["ivf+pq_train:start"] = time.time()
ivf_centroids, ivf_kmeans, pq_codebook, pq_kmeans_list = (
one_pass_train_ivf_pq_on_accelerator(
self,
column[0],
num_partitions,
metric,
accelerator,
num_sub_vectors=num_sub_vectors,
batch_size=20480,
filter_nan=filter_nan,
)
)
timers["ivf+pq_train:end"] = time.time()
ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"]
logging.info("ivf+pq training time: %ss", ivfpq_train_time)
timers["ivf+pq_assign:start"] = time.time()
shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator(
self,
column[0],
metric,
accelerator,
ivf_kmeans,
pq_kmeans_list,
batch_size=20480,
filter_nan=filter_nan,
)
timers["ivf+pq_assign:end"] = time.time()
ivfpq_assign_time = (
timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"]
)
logging.info("ivf+pq transform time: %ss", ivfpq_assign_time)

kwargs["precomputed_shuffle_buffers"] = shuffle_buffers
kwargs["precomputed_shuffle_buffers_path"] = os.path.join(
shuffle_output_dir, "data"
)
if index_type.startswith("IVF"):
if (ivf_centroids is not None) and (ivf_centroids_file is not None):
raise ValueError(
@@ -1659,9 +1714,6 @@ def create_index(
)
kwargs["num_partitions"] = num_partitions

# Handle timing for various parts of accelerated builds
timers = {}

if (precomputed_partition_dataset is not None) and (ivf_centroids is None):
raise ValueError(
"ivf_centroids must be provided when"
@@ -1692,7 +1744,7 @@ def create_index(
)
kwargs["precomputed_partitions_file"] = precomputed_partition_dataset

if accelerator is not None and ivf_centroids is None:
if accelerator is not None and ivf_centroids is None and not one_pass_ivfpq:
logging.info("Computing new precomputed partition dataset")
# Use accelerator to train ivf centroids
from .vector import (
@@ -1773,6 +1825,7 @@ def create_index(
pq_codebook is None
and accelerator is not None
and "precomputed_partitions_file" in kwargs
and not one_pass_ivfpq
):
logging.info("Computing new precomputed shuffle buffers for PQ.")
partitions_file = kwargs["precomputed_partitions_file"]
@@ -1852,13 +1905,15 @@ def create_index(
if shuffle_partition_concurrency is not None:
kwargs["shuffle_partition_concurrency"] = shuffle_partition_concurrency

times = []
times.append(time.time())
timers["final_create_index:start"] = time.time()
self._ds.create_index(
column, index_type, name, replace, storage_options, kwargs
)
times.append(time.time())
logging.info("Final create_index time: %ss", times[1] - times[0])
timers["final_create_index:end"] = time.time()
final_create_index_time = (
timers["final_create_index:end"] - timers["final_create_index:start"]
)
logging.info("Final create_index rust time: %ss", final_create_index_time)
# Save disk space
if "precomputed_shuffle_buffers_path" in kwargs.keys() and os.path.exists(
kwargs["precomputed_shuffle_buffers_path"]
Loading