Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: lancedb/lance
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 5c685c08f0d8ebe52c4626e1e02381243321c050
Choose a base ref
..
head repository: lancedb/lance
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: ac98d0b4a9bff9d8a48c8f1f8a2bb4d213b41bd0
Choose a head ref
Showing with 3,387 additions and 1,746 deletions.
  1. +5 −0 protos/encodings.proto
  2. +6 −1 python/Cargo.toml
  3. +5 −5 python/python/benchmarks/test_file.py
  4. +2 −1 python/python/lance/__init__.py
  5. +69 −0 python/python/lance/blob.py
  6. +63 −18 python/python/lance/dataset.py
  7. +5 −0 python/python/lance/file.py
  8. +9 −0 python/python/lance/lance/__init__.pyi
  9. +13 −3 python/python/lance/vector.py
  10. +97 −0 python/python/tests/test_blob.py
  11. +4 −0 python/python/tests/test_dataset.py
  12. +3 −1 python/python/tests/test_huggingface.py
  13. +96 −0 python/python/tests/test_schema_evolution.py
  14. +52 −12 python/src/dataset.rs
  15. +94 −0 python/src/dataset/blob.rs
  16. +2 −0 python/src/lib.rs
  17. +0 −1 python/src/reader.rs
  18. +17 −1 rust/lance-core/src/datatypes.rs
  19. +19 −6 rust/lance-core/src/datatypes/field.rs
  20. +10 −1 rust/lance-core/src/datatypes/schema.rs
  21. +49 −4 rust/lance-datafusion/src/projection.rs
  22. +51 −2 rust/lance-datagen/src/generator.rs
  23. +26 −10 rust/lance-encoding/src/data.rs
  24. +11 −4 rust/lance-encoding/src/decoder.rs
  25. +6 −8 rust/lance-encoding/src/encoder.rs
  26. +4 −15 rust/lance-encoding/src/encodings/logical/blob.rs
  27. +179 −34 rust/lance-encoding/src/encodings/logical/primitive.rs
  28. +962 −962 rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs
  29. +56 −1 rust/lance-encoding/src/encodings/physical/value.rs
  30. +10 −3 rust/lance-encoding/src/format.rs
  31. +9 −2 rust/lance-encoding/src/testing.rs
  32. +4 −5 rust/lance-file/src/v2/reader.rs
  33. +547 −331 rust/lance-index/src/scalar/inverted/builder.rs
  34. +140 −51 rust/lance-index/src/scalar/inverted/index.rs
  35. +33 −9 rust/lance/src/dataset.rs
  36. +318 −0 rust/lance/src/dataset/blob.rs
  37. +77 −18 rust/lance/src/dataset/fragment.rs
  38. +19 −13 rust/lance/src/dataset/scanner.rs
  39. +116 −34 rust/lance/src/dataset/schema_evolution.rs
  40. +84 −29 rust/lance/src/dataset/take.rs
  41. +8 −2 rust/lance/src/dataset/write/merge_insert.rs
  42. +1 −1 rust/lance/src/io/exec.rs
  43. +4 −1 rust/lance/src/io/exec/pushdown_scan.rs
  44. +84 −123 rust/lance/src/io/exec/scan.rs
  45. +18 −34 rust/lance/src/io/exec/take.rs
5 changes: 5 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
@@ -298,8 +298,13 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

message AllNullLayout {

}

message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
}
}
7 changes: 6 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ arrow = { version = "52.2", features = ["pyarrow"] }
arrow-array = "52.2"
arrow-data = "52.2"
arrow-schema = "52.2"
arrow-select = "52.2"
object_store = "0.10.1"
async-trait = "0.1"
chrono = "0.4.31"
@@ -42,7 +43,11 @@ lance-table = { path = "../rust/lance-table" }
lazy_static = "1"
log = "0.4"
prost = "0.12.2"
pyo3 = { version = "0.21", features = ["extension-module", "abi3-py39", "gil-refs"] }
pyo3 = { version = "0.21", features = [
"extension-module",
"abi3-py39",
"gil-refs",
] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
uuid = "1.3.0"
serde_json = "1"
10 changes: 5 additions & 5 deletions python/python/benchmarks/test_file.py
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ def read_all():

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
@@ -80,7 +80,7 @@ def read_all():

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="scan_single_column")
@@ -129,7 +129,7 @@ def read_all():

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
@@ -166,7 +166,7 @@ def sample():

result = benchmark.pedantic(sample, rounds=30, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="sample_single_column")
@@ -217,4 +217,4 @@ def sample():

result = benchmark.pedantic(sample, rounds=30, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS
3 changes: 2 additions & 1 deletion python/python/lance/__init__.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

from typing import TYPE_CHECKING, Dict, Optional, Union

from .blob import BlobColumn
from .blob import BlobColumn, BlobFile
from .dataset import (
LanceDataset,
LanceOperation,
@@ -31,6 +31,7 @@

__all__ = [
"BlobColumn",
"BlobFile",
"FragmentMetadata",
"LanceDataset",
"LanceFragment",
69 changes: 69 additions & 0 deletions python/python/lance/blob.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@

import pyarrow as pa

from lance.lance import LanceBlobFile


class BlobIterator:
def __init__(self, binary_iter: Iterator[pa.BinaryScalar]):
@@ -19,6 +21,16 @@ def __next__(self) -> Optional[IO[bytes]]:


class BlobColumn:
"""
A utility to wrap a Pyarrow binary column and iterate over the rows as
file-like objects.
This can be useful for working with medium-to-small binary objects that need
to interface with APIs that expect file-like objects. For very large binary
objects (4-8MB or more per value) you might be better off creating a blob column
and using :ref:`lance.Dataset.take_blobs` to access the blob data.
"""

def __init__(self, blob_column: Union[pa.Array, pa.ChunkedArray]):
if not isinstance(blob_column, (pa.Array, pa.ChunkedArray)):
raise ValueError(
@@ -35,3 +47,60 @@ def __init__(self, blob_column: Union[pa.Array, pa.ChunkedArray]):

def __iter__(self) -> Iterator[IO[bytes]]:
return BlobIterator(iter(self.blob_column))


class BlobFile(io.RawIOBase):
"""
Represents a blob in a Lance dataset as a file-like object.
"""

def __init__(self, inner: LanceBlobFile):
"""
Internal only: To obtain a BlobFile use :ref:`lance.Dataset.take_blobs`.
"""
self.inner = inner

## Note: most methods undocumented since they are defined by
## the base class.
def close(self) -> None:
self.inner.close()

@property
def closed(self) -> bool:
return self.inner.is_closed()

def readable(self) -> bool:
return True

def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
if whence == io.SEEK_SET:
self.inner.seek(offset)
elif whence == io.SEEK_CUR:
self.inner.seek(self.inner.tell() + offset)
elif whence == io.SEEK_END:
self.inner.seek(self.inner.size() + offset)
else:
raise ValueError(f"Invalid whence: {whence}")

return self.inner.tell()

def seekable(self) -> bool:
return True

def tell(self) -> int:
return self.inner.tell()

def size(self) -> int:
"""
Returns the size of the blob in bytes.
"""
return self.inner.size()

def readall(self) -> bytes:
return self.inner.readall()

def readinto(self, b: bytearray) -> int:
return self.inner.read_into(b)

def __repr__(self) -> str:
return f"<BlobFile size={self.size()}>"
81 changes: 63 additions & 18 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@
import pyarrow.dataset
from pyarrow import RecordBatch, Schema

from .blob import BlobFile
from .dependencies import (
_check_for_hugging_face,
_check_for_numpy,
@@ -713,6 +714,28 @@ def _take_rows(
[self._ds.take_rows(row_ids, columns, columns_with_transform)]
)

def take_blobs(
self,
row_ids: Union[List[int], pa.Array],
blob_column: str,
) -> List[BlobFile]:
"""
Select blobs by row_ids.
Parameters
----------
row_ids : List Array or array-like
row IDs to select in the dataset.
blob_column : str
The name of the blob column to select.
Returns
-------
blob_files : List[BlobFile]
"""
lance_blob_files = self._ds.take_blobs(row_ids, blob_column)
return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files]

def head(self, num_rows, **kwargs):
"""
Load the first N rows of the dataset.
@@ -891,23 +914,28 @@ def merge(

def add_columns(
self,
transforms: Dict[str, str] | BatchUDF,
transforms: Dict[str, str] | BatchUDF | ReaderLike,
read_columns: List[str] | None = None,
reader_schema: Optional[pa.Schema] = None,
):
"""
Add new columns with defined values.
There are two ways to specify the new columns. First, you can provide
There are several ways to specify the new columns. First, you can provide
SQL expressions for each new column. Second you can provide a UDF that
takes a batch of existing data and returns a new batch with the new
columns. These new columns will be appended to the dataset.
You can also provide a RecordBatchReader which will read the new column
values from some external source. This is often useful when the new column
values have already been staged to files (often by some distributed process)
See the :func:`lance.add_columns_udf` decorator for more information on
writing UDFs.
Parameters
----------
transforms : dict or AddColumnsUDF
transforms : dict or AddColumnsUDF or ReaderLike
If this is a dictionary, then the keys are the names of the new
columns and the values are SQL expression strings. These strings can
reference existing columns in the dataset.
@@ -917,6 +945,9 @@ def add_columns(
The names of the columns that the UDF will read. If None, then the
UDF will read all columns. This is only used when transforms is a
UDF. Otherwise, the read columns are inferred from the SQL expressions.
reader_scheam: pa.Schema, optional
Only valid if transforms is a `ReaderLike` object. This will be used to
determine the schema of the reader.
Examples
--------
@@ -965,7 +996,17 @@ def add_columns(
f"Column expressions must be a string. Got {type(k)}"
)
else:
raise TypeError("transforms must be a dict or AddColumnsUDF")
try:
reader = _coerce_reader(transforms, reader_schema)
self._ds.add_columns_from_reader(reader)
return

except TypeError as inner_err:
raise TypeError(
"transforms must be a dict, AddColumnsUDF, or a ReaderLike value. "
f"Received {type(transforms)}. Could not coerce to a "
f"reader: {inner_err}"
)

self._ds.add_columns(transforms, read_columns)

@@ -1252,6 +1293,7 @@ def create_scalar_index(
Literal["BITMAP"],
Literal["LABEL_LIST"],
Literal["INVERTED"],
Literal["FTS"],
],
name: Optional[str] = None,
*,
@@ -1305,7 +1347,7 @@ def create_scalar_index(
contains lists of tags (e.g. ``["tag1", "tag2", "tag3"]``) can be indexed
with a ``LABEL_LIST`` index. This index can only speedup queries with
``array_has_any`` or ``array_has_all`` filters.
* ``INVERTED``. It is used to index document columns. This index
* ``FTS/INVERTED``. It is used to index document columns. This index
can conduct full-text searches. For example, a column that contains any word
of query string "hello world". The results will be ranked by BM25.
@@ -1322,7 +1364,7 @@ def create_scalar_index(
or string column.
index_type : str
The type of the index. One of ``"BTREE"``, ``"BITMAP"``,
``"LABEL_LIST"`` or ``"INVERTED"``.
``"LABEL_LIST"``, "FTS" or ``"INVERTED"``.
name : str, optional
The index name. If not provided, it will be generated from the
column name.
@@ -1409,7 +1451,7 @@ def create_scalar_index(
elif index_type == "LABEL_LIST":
if not pa.types.is_list(field.type):
raise TypeError(f"LABEL_LIST index column {column} must be a list")
elif index_type == "INVERTED":
elif index_type in ["INVERTED", "FTS"]:
if not pa.types.is_string(field.type) and not pa.types.is_large_string(
field.type
):
@@ -1650,17 +1692,20 @@ def create_index(
logging.info("Doing one-pass ivfpq accelerated computations")

timers["ivf+pq_train:start"] = time.time()
ivf_centroids, ivf_kmeans, pq_codebook, pq_kmeans_list = (
one_pass_train_ivf_pq_on_accelerator(
self,
column[0],
num_partitions,
metric,
accelerator,
num_sub_vectors=num_sub_vectors,
batch_size=20480,
filter_nan=filter_nan,
)
(
ivf_centroids,
ivf_kmeans,
pq_codebook,
pq_kmeans_list,
) = one_pass_train_ivf_pq_on_accelerator(
self,
column[0],
num_partitions,
metric,
accelerator,
num_sub_vectors=num_sub_vectors,
batch_size=20480,
filter_nan=filter_nan,
)
timers["ivf+pq_train:end"] = time.time()
ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"]
5 changes: 5 additions & 0 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from pathlib import Path
from typing import Dict, Optional, Union

import pyarrow as pa
@@ -69,6 +70,8 @@ def __init__(self, path: str, storage_options: Optional[Dict[str, str]] = None):
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
"""
if isinstance(path, Path):
path = str(path)
self._reader = _LanceFileReader(path, storage_options=storage_options)

def read_all(self, *, batch_size: int = 1024, batch_readahead=16) -> ReaderResults:
@@ -202,6 +205,8 @@ def __init__(
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
"""
if isinstance(path, Path):
path = str(path)
self._writer = _LanceFileWriter(
path,
schema,
9 changes: 9 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
@@ -84,3 +84,12 @@ class LanceFileMetadata:

class _Session:
def size_bytes(self) -> int: ...

class LanceBlobFile:
def close(self): ...
def is_closed(self) -> bool: ...
def seek(self, offset: int): ...
def tell(self) -> int: ...
def size(self) -> int: ...
def readall(self) -> bytes: ...
def readinto(self, b: bytearray) -> int: ...
Loading