Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add utility for reporting data stats #3328

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from . import log
from .blob import BlobColumn, BlobFile
from .dataset import (
DataStatistics,
FieldStatistics,
LanceDataset,
LanceOperation,
LanceScanner,
Expand Down Expand Up @@ -36,6 +38,8 @@
__all__ = [
"BlobColumn",
"BlobFile",
"DataStatistics",
"FieldStatistics",
"FragmentMetadata",
"LanceDataset",
"LanceFragment",
Expand Down
21 changes: 21 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3449,6 +3449,21 @@ def update(self, tag: str, version: int) -> None:
self._ds.update_tag(tag, version)


@dataclass
class FieldStatistics:
"""Statistics about a field in the dataset"""

id: int #: id of the field
bytes_on_disk: int #: (possibly compressed) bytes on disk used to store the field


@dataclass
class DataStatistics:
"""Statistics about the data in the dataset"""

fields: FieldStatistics #: Statistics about the fields in the dataset


class DatasetStats(TypedDict):
num_deleted_rows: int
num_fragments: int
Expand Down Expand Up @@ -3485,6 +3500,12 @@ def index_stats(self, index_name: str) -> Dict[str, Any]:
index_stats = json.loads(self._ds.index_statistics(index_name))
return index_stats

def data_stats(self) -> DataStatistics:
"""
Statistics about the data in the dataset.
"""
return self._ds.data_stats()


def write_dataset(
data_obj: ReaderLike,
Expand Down
30 changes: 30 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,36 @@ def test_use_scalar_index(tmp_path: Path):
EXPECTED_MINOR_VERSION = 0


def test_stats(tmp_path: Path):
table = pa.table({"x": [1, 2, 3, 4], "y": ["foo", "bar", "baz", "qux"]})
dataset = lance.write_dataset(table, tmp_path)
stats = dataset.stats.dataset_stats()

assert stats["num_deleted_rows"] == 0
assert stats["num_fragments"] == 1
assert stats["num_small_files"] == 1

data_stats = dataset.stats.data_stats()

assert data_stats.fields[0].id == 0
assert data_stats.fields[0].bytes_on_disk == 32
assert data_stats.fields[1].id == 1
assert data_stats.fields[1].bytes_on_disk == 44 # 12 bytes data + 32 bytes offset

dataset.add_columns({"z": "y"})

dataset.insert(pa.table({"x": [5], "z": ["quux"]}))

data_stats = dataset.stats.data_stats()

assert data_stats.fields[0].id == 0
assert data_stats.fields[0].bytes_on_disk == 40
assert data_stats.fields[1].id == 1
assert data_stats.fields[1].bytes_on_disk == 44 # 12 bytes data + 32 bytes offset
assert data_stats.fields[2].id == 2
assert data_stats.fields[2].bytes_on_disk == 56 # 16 bytes data + 40 bytes offset


def test_default_storage_version(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)
Expand Down
8 changes: 8 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use futures::{StreamExt, TryFutureExt};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::refs::{Ref, TagContents};
use lance::dataset::scanner::MaterializationStyle;
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
use lance::dataset::{
fragment::FileFragment as LanceFileFragment,
progress::WriteFragmentProgress,
Expand Down Expand Up @@ -87,6 +88,7 @@ pub mod blob;
pub mod cleanup;
pub mod commit;
pub mod optimize;
pub mod stats;

const DEFAULT_NPROBS: usize = 1;
const DEFAULT_INDEX_CACHE_SIZE: usize = 256;
Expand Down Expand Up @@ -1232,6 +1234,12 @@ impl Dataset {
.map_err(|err| PyIOError::new_err(err.to_string()))
}

fn data_stats(&self) -> PyResult<PyLance<DataStatistics>> {
RT.block_on(None, self.ds.calculate_data_stats())?
.infer_error()
.map(PyLance)
}

fn get_fragments(self_: PyRef<'_, Self>) -> PyResult<Vec<FileFragment>> {
let core_fragments = self_.ds.get_fragments();

Expand Down
45 changes: 45 additions & 0 deletions python/src/dataset/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use lance::dataset::statistics::{DataStatistics, FieldStatistics};
use pyo3::{intern, types::PyAnyMethods, PyObject, Python, ToPyObject};

use crate::utils::{export_vec, PyLance};

impl ToPyObject for PyLance<&FieldStatistics> {
fn to_object(&self, py: Python<'_>) -> PyObject {
let cls = py
.import_bound(intern!(py, "lance"))
.and_then(|m| m.getattr("FieldStatistics"))
.expect("FieldStatistics class not found");

let id = self.0.id;
let bytes_on_disk = self.0.bytes_on_disk;

cls.call1((id, bytes_on_disk)).unwrap().to_object(py)
}
}

impl ToPyObject for PyLance<DataStatistics> {
fn to_object(&self, py: Python<'_>) -> PyObject {
let cls = py
.import_bound(intern!(py, "lance"))
.and_then(|m| m.getattr("DataStatistics"))
.expect("DataStatistics class not found");

let fields = export_vec(py, &self.0.fields);

cls.call1((fields,)).unwrap().to_object(py)
}
}
1 change: 1 addition & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod refs;
pub(crate) mod rowids;
pub mod scanner;
mod schema_evolution;
pub mod statistics;
mod take;
pub mod transaction;
pub mod updater;
Expand Down
52 changes: 51 additions & 1 deletion rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
pub mod write;

use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -48,6 +48,7 @@ use self::write::FragmentCreateBuilder;
use super::hash_joiner::HashJoiner;
use super::rowids::load_row_id_sequence;
use super::scanner::Scanner;
use super::statistics::FieldStatistics;
use super::updater::Updater;
use super::{schema_evolution, NewColumnTransform, WriteParams};
use crate::arrow::*;
Expand Down Expand Up @@ -97,6 +98,9 @@ pub trait GenericFileReader: std::fmt::Debug + Send + Sync {
/// Schema of the reader
fn projection(&self) -> &Arc<Schema>;

/// Update storage statistics (ignored by v1 reader)
fn update_storage_stats(&self, field_stats: &mut HashMap<u32, FieldStatistics>);

// Helper functions to fallback to the legacy implementation while we
// slowly migrate functionality over to the generic reader

Expand Down Expand Up @@ -240,6 +244,10 @@ impl GenericFileReader for V1Reader {
self.reader.len() as u32
}

fn update_storage_stats(&self, _field_stats: &mut HashMap<u32, FieldStatistics>) {
// No-op for v1 files
}

fn clone_box(&self) -> Box<dyn GenericFileReader> {
Box::new(self.clone())
}
Expand Down Expand Up @@ -364,6 +372,29 @@ mod v2_adapter {
.boxed())
}

fn update_storage_stats(&self, field_stats: &mut HashMap<u32, FieldStatistics>) {
let file_statistics = self.reader.file_statistics();
let column_idx_to_field_id = self
.field_id_to_column_idx
.iter()
.map(|(field_id, column_idx)| (*column_idx, *field_id))
.collect::<HashMap<_, _>>();

// Some fields span more than one column. We assume a column that doesn't have an
// entry in the field_id_to_column_idx map is a continuation of the previous field.
let mut current_field_id = 0;
for (column_idx, stats) in file_statistics.columns.iter().enumerate() {
if let Some(field_id) = column_idx_to_field_id.get(&(column_idx as u32)) {
current_field_id = *field_id;
}
// If the field_id is not in the map then the field may no longer be part of the
// dataset
if let Some(field_stats) = field_stats.get_mut(&current_field_id) {
field_stats.bytes_on_disk += stats.size_bytes;
}
}
}

fn projection(&self) -> &Arc<Schema> {
&self.projection
}
Expand Down Expand Up @@ -461,6 +492,10 @@ impl GenericFileReader for NullReader {
self.read_range_tasks(0..num_rows, batch_size, projection)
}

fn update_storage_stats(&self, _field_stats: &mut HashMap<u32, FieldStatistics>) {
// No-op for null reader
}

fn projection(&self) -> &Arc<Schema> {
&self.schema
}
Expand Down Expand Up @@ -622,6 +657,21 @@ impl FileFragment {
}
}

pub(crate) async fn update_storage_stats(
&self,
field_stats: &mut HashMap<u32, FieldStatistics>,
dataset_schema: &Schema,
scan_scheduler: Arc<ScanScheduler>,
) -> Result<()> {
for reader in self
.open_readers(dataset_schema, Some((scan_scheduler, 0)))
.await?
{
reader.update_storage_stats(field_stats);
}
Ok(())
}

pub fn dataset(&self) -> &Dataset {
self.dataset.as_ref()
}
Expand Down
69 changes: 69 additions & 0 deletions rust/lance/src/dataset/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Module for statistics related to the dataset.

use std::{collections::HashMap, future::Future, sync::Arc};

use lance_core::Result;
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};

use super::{fragment::FileFragment, Dataset};

/// Statistics about a single field in the dataset
pub struct FieldStatistics {
/// Id of the field
pub id: u32,
/// Amount of data in the field (after compression, if any)
///
/// This will be 0 if the data storage version is less than 2
pub bytes_on_disk: u64,
}

/// Statistics about the data in the dataset
pub struct DataStatistics {
/// Statistics about each field in the dataset
pub fields: Vec<FieldStatistics>,
}

pub trait DatasetStatisticsExt {
/// Get statistics about the data in the dataset
fn calculate_data_stats(
self: &Arc<Self>,
) -> impl Future<Output = Result<DataStatistics>> + Send;
}

impl DatasetStatisticsExt for Dataset {
async fn calculate_data_stats(self: &Arc<Self>) -> Result<DataStatistics> {
let field_ids = self.schema().field_ids();
let mut field_stats: HashMap<u32, FieldStatistics> =
HashMap::from_iter(field_ids.iter().map(|id| {
(
*id as u32,
FieldStatistics {
id: *id as u32,
bytes_on_disk: 0,
},
)
}));
if !self.is_legacy_storage() {
let scan_scheduler = ScanScheduler::new(
self.object_store.clone(),
SchedulerConfig::max_bandwidth(self.object_store.as_ref()),
);
for fragment in self.fragments().as_ref() {
let file_fragment = FileFragment::new(self.clone(), fragment.clone());
file_fragment
.update_storage_stats(&mut field_stats, self.schema(), scan_scheduler.clone())
.await?;
}
}
let field_stats = field_ids
.into_iter()
.map(|id| field_stats.remove(&(id as u32)).unwrap())
.collect();
Ok(DataStatistics {
fields: field_stats,
})
}
}
Loading