Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add global counters for bytes_read & iops for benchmarking utility #3321

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
write_dataset,
)
from .fragment import FragmentMetadata, LanceFragment
from .lance import bytes_read_counter, iops_counter
from .schema import json_to_schema, schema_to_json
from .util import sanitize_ts

Expand All @@ -43,6 +44,8 @@
"MergeInsertBuilder",
"Transaction",
"__version__",
"bytes_read_counter",
"iops_counter",
"write_dataset",
"schema_to_json",
"json_to_schema",
Expand Down
2 changes: 2 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ class _Fragment:
@property
def num_deletions(self) -> int: ...

def iops_counter() -> int: ...
def bytes_read_counter() -> int: ...
def _write_dataset(
reader: pa.RecordBatchReader, uri: str | Path | _Dataset, params: Dict[str, Any]
): ...
Expand Down
9 changes: 9 additions & 0 deletions python/python/tests/test_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,12 @@ def test_roundtrip_schema(tmp_path):
data = pa.table({"a": [1.0, 2.0]}).to_batches()
dataset = lance.write_dataset(data, tmp_path, schema=schema)
assert dataset.schema == schema


def test_io_counters(tmp_path):
starting_iops = lance.iops_counter()
starting_bytes = lance.bytes_read_counter()
dataset = lance.write_dataset(pa.table({"a": [1, 2, 3]}), tmp_path)
dataset.to_table()
assert lance.iops_counter() > starting_iops
assert lance.bytes_read_counter() > starting_bytes
12 changes: 12 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(read_tfrecord))?;
m.add_wrapped(wrap_pyfunction!(trace_to_chrome))?;
m.add_wrapped(wrap_pyfunction!(manifest_needs_migration))?;
m.add_wrapped(wrap_pyfunction!(bytes_read_counter))?;
m.add_wrapped(wrap_pyfunction!(iops_counter))?;
// Debug functions
m.add_wrapped(wrap_pyfunction!(debug::format_schema))?;
m.add_wrapped(wrap_pyfunction!(debug::format_manifest))?;
Expand All @@ -154,6 +156,16 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
Ok(())
}

#[pyfunction(name = "iops_counter")]
fn iops_counter() -> PyResult<u64> {
Ok(::lance::io::iops_counter())
}

#[pyfunction(name = "bytes_read_counter")]
fn bytes_read_counter() -> PyResult<u64> {
Ok(::lance::io::bytes_read_counter())
}

#[pyfunction(name = "_schema_to_json")]
fn schema_to_json(schema: PyArrowType<ArrowSchema>) -> PyResult<String> {
schema.0.to_json().map_err(|e| {
Expand Down
19 changes: 13 additions & 6 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,26 @@ impl TryFrom<&LogicalType> for DataType {
let splits = lt.0.split(':').collect::<Vec<_>>();
match splits[0] {
"fixed_size_list" => {
if splits.len() != 3 {
if splits.len() < 3 {
return Err(Error::Schema {
message: format!("Unsupported logical type: {}", lt),
location: location!(),
});
}

let size: i32 = splits[2].parse::<i32>().map_err(|e: _| Error::Schema {
message: e.to_string(),
location: location!(),
})?;
let size: i32 =
splits
.last()
.unwrap()
.parse::<i32>()
.map_err(|e: _| Error::Schema {
message: e.to_string(),
location: location!(),
})?;

let inner_type = splits[1..splits.len() - 1].join(":");

match splits[1] {
match inner_type.as_str() {
BFLOAT16_EXT_NAME => {
let field = ArrowField::new("item", Self::FixedSizeBinary(2), true)
.with_metadata(
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod testing;
pub mod traits;
pub mod utils;

pub use scheduler::{bytes_read_counter, iops_counter};

/// Defines a selection of rows to read from a file/batch
#[derive(Debug, Clone)]
pub enum ReadBatchParams {
Expand Down
15 changes: 15 additions & 0 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ const BACKPRESSURE_MIN: u64 = 5;
// Don't log backpressure warnings more than once / minute
const BACKPRESSURE_DEBOUNCE: u64 = 60;

// Global counter of how many IOPS we have issued
static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
// Global counter of how many bytes were read by the scheduler
static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);

pub fn iops_counter() -> u64 {
IOPS_COUNTER.load(Ordering::Acquire)
}

pub fn bytes_read_counter() -> u64 {
BYTES_READ_COUNTER.load(Ordering::Acquire)
}

// There are two structures that control the I/O scheduler concurrency. First,
// we have a hard limit on the number of IOPS that can be issued concurrently.
// This limit is process-wide.
Expand Down Expand Up @@ -456,6 +469,8 @@ impl IoTask {
let bytes_fut = self
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize);
IOPS_COUNTER.fetch_add(1, Ordering::Release);
BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
bytes_fut.await.map_err(Error::from)
};
IOPS_QUOTA.release();
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod commit;
pub mod exec;

pub use lance_io::{
bytes_read_counter, iops_counter,
object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore},
stream::RecordBatchStream,
};
Loading