Skip to content

Commit 949c6e7

Browse files
authored
feat: add support for explain analyze (#3484)
This adds runtime execution metrics to all of our exec nodes. These metrics can be accessed by calling plan.analyze_plan().
1 parent 9e614b1 commit 949c6e7

File tree

13 files changed

+319
-45
lines changed

13 files changed

+319
-45
lines changed

python/python/lance/dataset.py

+15
Original file line numberDiff line numberDiff line change
@@ -3412,6 +3412,21 @@ def explain_plan(self, verbose=False) -> str:
34123412

34133413
return self._scanner.explain_plan(verbose=verbose)
34143414

3415+
def analyze_plan(self) -> str:
3416+
"""Execute the plan for this scanner and display with runtime metrics.
3417+
3418+
Parameters
3419+
----------
3420+
verbose : bool, default False
3421+
Use a verbose output format.
3422+
3423+
Returns
3424+
-------
3425+
plan : str
3426+
"""
3427+
3428+
return self._scanner.analyze_plan()
3429+
34153430

34163431
class DatasetOptimizer:
34173432
def __init__(self, dataset: LanceDataset):

python/python/lance/lance/__init__.pyi

+1
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ class _Scanner:
322322
@property
323323
def schema(self) -> pa.Schema: ...
324324
def explain_plan(self, verbose: bool) -> str: ...
325+
def analyze_plan(self) -> str: ...
325326
def count_rows(self) -> int: ...
326327
def to_pyarrow(self) -> pa.RecordBatchReader: ...
327328

python/python/tests/test_dataset.py

+35
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,41 @@ def test_select_none(tmp_path: Path):
788788
).explain_plan(True)
789789

790790

791+
def test_analyze_filtered_scan(tmp_path: Path):
792+
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
793+
base_dir = tmp_path / "test"
794+
ds = lance.write_dataset(table, base_dir)
795+
plan = ds.scanner(columns=[], filter="a < 50", with_row_id=True).analyze_plan()
796+
print(plan)
797+
assert re.search(r"^\s*LanceScan:.*output_rows=100.*$", plan, re.MULTILINE)
798+
assert re.search(r"^\s*FilterExec:.*output_rows=50.*$", plan, re.MULTILINE)
799+
800+
801+
def test_analyze_index_scan(tmp_path: Path):
802+
table = pa.table({"filter": range(100)})
803+
dataset = lance.write_dataset(table, tmp_path)
804+
dataset.create_scalar_index("filter", "BTREE")
805+
plan = dataset.scanner(filter="filter = 10").analyze_plan()
806+
assert "MaterializeIndex: query=filter = 10, metrics=[output_rows=1" in plan
807+
808+
809+
def test_analyze_vector_search(tmp_path: Path):
810+
table = pa.Table.from_pydict(
811+
{
812+
"id": [i for i in range(10)],
813+
"vector": pa.array(
814+
[[1.0, 1.0] for _ in range(10)], pa.list_(pa.float32(), 2)
815+
),
816+
}
817+
)
818+
dataset = lance.write_dataset(table, tmp_path / "dataset", mode="create")
819+
dataset.delete("id = 0")
820+
plan = dataset.scanner(
821+
nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]}
822+
).analyze_plan()
823+
assert "KNNVectorDistance: metric=l2, metrics=[output_rows=10" in plan
824+
825+
791826
def test_get_fragments(tmp_path: Path):
792827
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
793828
base_dir = tmp_path / "test"

python/src/scanner.rs

+12
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ impl Scanner {
6868
Ok(res)
6969
}
7070

71+
#[pyo3(signature = (*))]
72+
fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<String> {
73+
let scanner = self_.scanner.clone();
74+
let res = RT
75+
.spawn(Some(self_.py()), async move {
76+
scanner.analyze_plan().await
77+
})?
78+
.map_err(|err| PyValueError::new_err(err.to_string()))?;
79+
80+
Ok(res)
81+
}
82+
7183
fn count_rows(self_: PyRef<'_, Self>) -> PyResult<u64> {
7284
let scanner = self_.scanner.clone();
7385
RT.spawn(Some(self_.py()), async move { scanner.count_rows().await })?

rust/lance/src/dataset/scanner.rs

+22
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaR
1313
use arrow_select::concat::concat_batches;
1414
use async_recursion::async_recursion;
1515
use datafusion::common::SchemaExt;
16+
use datafusion::execution::TaskContext;
1617
use datafusion::functions_aggregate;
1718
use datafusion::functions_aggregate::count::count_udaf;
1819
use datafusion::logical_expr::Expr;
1920
use datafusion::physical_expr::PhysicalSortExpr;
21+
use datafusion::physical_plan::analyze::AnalyzeExec;
2022
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
2123
use datafusion::physical_plan::empty::EmptyExec;
2224
use datafusion::physical_plan::expressions;
@@ -2356,6 +2358,26 @@ impl Scanner {
23562358
))
23572359
}
23582360

2361+
#[instrument(level = "info", skip(self))]
2362+
pub async fn analyze_plan(&self) -> Result<String> {
2363+
let plan = self.create_plan().await?;
2364+
let schema = plan.schema();
2365+
let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
2366+
let ctx = Arc::new(TaskContext::default());
2367+
let mut stream = analyze.execute(0, ctx).map_err(|err| {
2368+
Error::io(
2369+
format!("Failed to execute analyze plan: {}", err),
2370+
location!(),
2371+
)
2372+
})?;
2373+
2374+
// fully execute the plan
2375+
while (stream.next().await).is_some() {}
2376+
2377+
let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
2378+
Ok(format!("{}", display.indent(true)))
2379+
}
2380+
23592381
#[instrument(level = "info", skip(self))]
23602382
pub async fn explain_plan(&self, verbose: bool) -> Result<String> {
23612383
let plan = self.create_plan().await?;

rust/lance/src/io/exec/fts.rs

+32-10
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use datafusion::common::Statistics;
1010
use datafusion::error::{DataFusionError, Result as DataFusionResult};
1111
use datafusion::execution::SendableRecordBatchStream;
1212
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
13-
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
13+
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
1414
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1515
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
1616
use futures::stream::{self};
@@ -24,7 +24,9 @@ use tracing::instrument;
2424
use crate::index::prefilter::DatasetPreFilter;
2525
use crate::{index::DatasetIndexInternalExt, Dataset};
2626

27-
use super::utils::{FilteredRowIdsToPrefilter, SelectionVectorToPrefilter};
27+
use super::utils::{
28+
FilteredRowIdsToPrefilter, InstrumentedRecordBatchStreamAdapter, SelectionVectorToPrefilter,
29+
};
2830
use super::PreFilterSource;
2931

3032
/// An execution node that performs full text search
@@ -41,6 +43,8 @@ pub struct FtsExec {
4143
/// Prefiltering input
4244
prefilter_source: PreFilterSource,
4345
properties: PlanProperties,
46+
47+
metrics: ExecutionPlanMetricsSet,
4448
}
4549

4650
impl DisplayAs for FtsExec {
@@ -72,6 +76,7 @@ impl FtsExec {
7276
query,
7377
prefilter_source,
7478
properties,
79+
metrics: ExecutionPlanMetricsSet::new(),
7580
}
7681
}
7782
}
@@ -108,6 +113,7 @@ impl ExecutionPlan for FtsExec {
108113
query: self.query.clone(),
109114
prefilter_source: PreFilterSource::None,
110115
properties: self.properties.clone(),
116+
metrics: ExecutionPlanMetricsSet::new(),
111117
},
112118
1 => {
113119
let src = children.pop().unwrap();
@@ -130,6 +136,7 @@ impl ExecutionPlan for FtsExec {
130136
query: self.query.clone(),
131137
prefilter_source,
132138
properties: self.properties.clone(),
139+
metrics: ExecutionPlanMetricsSet::new(),
133140
}
134141
}
135142
_ => {
@@ -150,6 +157,7 @@ impl ExecutionPlan for FtsExec {
150157
let query = self.query.clone();
151158
let ds = self.dataset.clone();
152159
let prefilter_source = self.prefilter_source.clone();
160+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
153161

154162
let indices = self.indices.clone();
155163
let stream = stream::iter(indices)
@@ -208,16 +216,21 @@ impl ExecutionPlan for FtsExec {
208216
})
209217
.buffered(self.indices.len());
210218
let schema = self.schema();
211-
Ok(
212-
Box::pin(RecordBatchStreamAdapter::new(schema, stream.boxed()))
213-
as SendableRecordBatchStream,
214-
)
219+
Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new(
220+
schema,
221+
stream.boxed(),
222+
baseline_metrics,
223+
)) as SendableRecordBatchStream)
215224
}
216225

217226
fn statistics(&self) -> DataFusionResult<datafusion::physical_plan::Statistics> {
218227
Ok(Statistics::new_unknown(&FTS_SCHEMA))
219228
}
220229

230+
fn metrics(&self) -> Option<MetricsSet> {
231+
Some(self.metrics.clone_inner())
232+
}
233+
221234
fn properties(&self) -> &PlanProperties {
222235
&self.properties
223236
}
@@ -235,6 +248,7 @@ pub struct FlatFtsExec {
235248
column_inputs: Vec<(String, Vec<Index>, Arc<dyn ExecutionPlan>)>,
236249
query: FullTextSearchQuery,
237250
properties: PlanProperties,
251+
metrics: ExecutionPlanMetricsSet,
238252
}
239253

240254
impl DisplayAs for FlatFtsExec {
@@ -264,6 +278,7 @@ impl FlatFtsExec {
264278
column_inputs,
265279
query,
266280
properties,
281+
metrics: ExecutionPlanMetricsSet::new(),
267282
}
268283
}
269284
}
@@ -309,6 +324,7 @@ impl ExecutionPlan for FlatFtsExec {
309324
column_inputs,
310325
query: self.query.clone(),
311326
properties: self.properties.clone(),
327+
metrics: ExecutionPlanMetricsSet::new(),
312328
}))
313329
}
314330

@@ -321,6 +337,7 @@ impl ExecutionPlan for FlatFtsExec {
321337
let query = self.query.clone();
322338
let ds = self.dataset.clone();
323339
let column_inputs = self.column_inputs.clone();
340+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
324341

325342
let stream = stream::iter(column_inputs)
326343
.map(move |(column, indices, input)| {
@@ -353,16 +370,21 @@ impl ExecutionPlan for FlatFtsExec {
353370
.buffered(self.column_inputs.len())
354371
.try_flatten();
355372
let schema = self.schema();
356-
Ok(
357-
Box::pin(RecordBatchStreamAdapter::new(schema, stream.boxed()))
358-
as SendableRecordBatchStream,
359-
)
373+
Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new(
374+
schema,
375+
stream.boxed(),
376+
baseline_metrics,
377+
)) as SendableRecordBatchStream)
360378
}
361379

362380
fn statistics(&self) -> DataFusionResult<datafusion::physical_plan::Statistics> {
363381
Ok(Statistics::new_unknown(&FTS_SCHEMA))
364382
}
365383

384+
fn metrics(&self) -> Option<MetricsSet> {
385+
Some(self.metrics.clone_inner())
386+
}
387+
366388
fn properties(&self) -> &PlanProperties {
367389
&self.properties
368390
}

0 commit comments

Comments
 (0)