Skip to content

Commit 2e2bf1a

Browse files
authored
fix: implement with_new_children for FTS (#3441)
this method isn't implemented before this and it causes panic, we don't actually need this but maybe some optimizing would call this method for rewriting the plans Signed-off-by: BubbleCal <bubble-cal@outlook.com>
1 parent 8a61b69 commit 2e2bf1a

File tree

2 files changed

+67
-13
lines changed

2 files changed

+67
-13
lines changed

rust/lance/src/dataset/scanner.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1523,7 +1523,7 @@ impl Scanner {
15231523
.limit(self.limit);
15241524

15251525
// load indices
1526-
let mut column_inputs = HashMap::with_capacity(columns.len());
1526+
let mut column_inputs = Vec::with_capacity(columns.len());
15271527
for column in columns {
15281528
let index = self
15291529
.dataset
@@ -1571,12 +1571,12 @@ impl Scanner {
15711571
scan_node
15721572
};
15731573

1574-
column_inputs.insert(column.clone(), (index_uuids, unindexed_scan_node));
1574+
column_inputs.push((column.clone(), index_uuids, unindexed_scan_node));
15751575
}
15761576

15771577
let indices = column_inputs
15781578
.iter()
1579-
.map(|(col, (idx, _))| (col.clone(), idx.clone()))
1579+
.map(|(col, idx, _)| (col.clone(), idx.clone()))
15801580
.collect();
15811581
let prefilter_source = self.prefilter_source(filter_plan).await?;
15821582
let fts_plan = Arc::new(FtsExec::new(

rust/lance/src/io/exec/fts.rs

+64-10
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,46 @@ impl ExecutionPlan for FtsExec {
9999

100100
fn with_new_children(
101101
self: Arc<Self>,
102-
_children: Vec<Arc<dyn ExecutionPlan>>,
102+
mut children: Vec<Arc<dyn ExecutionPlan>>,
103103
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
104-
todo!()
104+
let plan = match children.len() {
105+
0 => Self {
106+
dataset: self.dataset.clone(),
107+
indices: self.indices.clone(),
108+
query: self.query.clone(),
109+
prefilter_source: PreFilterSource::None,
110+
properties: self.properties.clone(),
111+
},
112+
1 => {
113+
let src = children.pop().unwrap();
114+
let prefilter_source = match &self.prefilter_source {
115+
PreFilterSource::FilteredRowIds(_) => {
116+
PreFilterSource::FilteredRowIds(src.clone())
117+
}
118+
PreFilterSource::ScalarIndexQuery(_) => {
119+
PreFilterSource::ScalarIndexQuery(src.clone())
120+
}
121+
PreFilterSource::None => {
122+
return Err(DataFusionError::Internal(
123+
"Unexpected prefilter source".to_string(),
124+
));
125+
}
126+
};
127+
Self {
128+
dataset: self.dataset.clone(),
129+
indices: self.indices.clone(),
130+
query: self.query.clone(),
131+
prefilter_source,
132+
properties: self.properties.clone(),
133+
}
134+
}
135+
_ => {
136+
return Err(DataFusionError::Internal(
137+
"Unexpected number of children".to_string(),
138+
));
139+
}
140+
};
141+
Ok(Arc::new(plan))
105142
}
106143

107144
#[instrument(name = "fts_exec", level = "debug", skip_all)]
@@ -194,8 +231,8 @@ impl ExecutionPlan for FtsExec {
194231
#[derive(Debug)]
195232
pub struct FlatFtsExec {
196233
dataset: Arc<Dataset>,
197-
// column -> (indices, unindexed input stream)
198-
column_inputs: HashMap<String, (Vec<Index>, Arc<dyn ExecutionPlan>)>,
234+
// (column, indices, unindexed input stream)
235+
column_inputs: Vec<(String, Vec<Index>, Arc<dyn ExecutionPlan>)>,
199236
query: FullTextSearchQuery,
200237
properties: PlanProperties,
201238
}
@@ -213,7 +250,7 @@ impl DisplayAs for FlatFtsExec {
213250
impl FlatFtsExec {
214251
pub fn new(
215252
dataset: Arc<Dataset>,
216-
column_inputs: HashMap<String, (Vec<Index>, Arc<dyn ExecutionPlan>)>,
253+
column_inputs: Vec<(String, Vec<Index>, Arc<dyn ExecutionPlan>)>,
217254
query: FullTextSearchQuery,
218255
) -> Self {
219256
let properties = PlanProperties::new(
@@ -246,16 +283,33 @@ impl ExecutionPlan for FlatFtsExec {
246283

247284
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
248285
self.column_inputs
249-
.values()
250-
.map(|(_, input)| input)
286+
.iter()
287+
.map(|(_, _, input)| input)
251288
.collect()
252289
}
253290

254291
fn with_new_children(
255292
self: Arc<Self>,
256-
_children: Vec<Arc<dyn ExecutionPlan>>,
293+
children: Vec<Arc<dyn ExecutionPlan>>,
257294
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
258-
todo!()
295+
if self.column_inputs.len() != children.len() {
296+
return Err(DataFusionError::Internal(
297+
"Unexpected number of children".to_string(),
298+
));
299+
}
300+
301+
let column_inputs = self
302+
.column_inputs
303+
.iter()
304+
.zip(children)
305+
.map(|((column, indices, _), input)| (column.clone(), indices.clone(), input))
306+
.collect();
307+
Ok(Arc::new(Self {
308+
dataset: self.dataset.clone(),
309+
column_inputs,
310+
query: self.query.clone(),
311+
properties: self.properties.clone(),
312+
}))
259313
}
260314

261315
#[instrument(name = "flat_fts_exec", level = "debug", skip_all)]
@@ -269,7 +323,7 @@ impl ExecutionPlan for FlatFtsExec {
269323
let column_inputs = self.column_inputs.clone();
270324

271325
let stream = stream::iter(column_inputs)
272-
.map(move |(column, (indices, input))| {
326+
.map(move |(column, indices, input)| {
273327
let index_meta = indices[0].clone();
274328
let uuid = index_meta.uuid.to_string();
275329
let query = query.clone();

0 commit comments

Comments
 (0)