@@ -17,12 +17,12 @@ use datafusion::{
17
17
TaskContext ,
18
18
} ,
19
19
physical_plan:: {
20
- streaming:: PartitionStream , DisplayAs , DisplayFormatType , ExecutionPlan ,
20
+ streaming:: PartitionStream , DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties ,
21
21
SendableRecordBatchStream ,
22
22
} ,
23
23
} ;
24
- use datafusion_common:: DataFusionError ;
25
- use datafusion_physical_expr:: Partitioning ;
24
+ use datafusion_common:: { DataFusionError , Statistics } ;
25
+ use datafusion_physical_expr:: { EquivalenceProperties , Partitioning } ;
26
26
27
27
use lance_arrow:: SchemaExt ;
28
28
use lance_core:: Result ;
@@ -32,11 +32,15 @@ use log::{info, warn};
32
32
///
33
33
/// It can only be used once, and will return the stream. After that the node
34
34
/// is exhuasted.
35
+ ///
36
+ /// Note: the stream should be finite, otherwise we will report datafusion properties
37
+ /// incorrectly.
35
38
pub struct OneShotExec {
36
39
stream : Mutex < Option < SendableRecordBatchStream > > ,
37
40
// We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
38
41
// can still function after exhuasted
39
42
schema : Arc < ArrowSchema > ,
43
+ properties : PlanProperties ,
40
44
}
41
45
42
46
impl OneShotExec {
@@ -45,7 +49,12 @@ impl OneShotExec {
45
49
let schema = stream. schema ( ) . clone ( ) ;
46
50
Self {
47
51
stream : Mutex :: new ( Some ( stream) ) ,
48
- schema,
52
+ schema : schema. clone ( ) ,
53
+ properties : PlanProperties :: new (
54
+ EquivalenceProperties :: new ( schema) ,
55
+ Partitioning :: RoundRobinBatch ( 1 ) ,
56
+ datafusion:: physical_plan:: ExecutionMode :: Bounded ,
57
+ ) ,
49
58
}
50
59
}
51
60
}
@@ -96,14 +105,6 @@ impl ExecutionPlan for OneShotExec {
96
105
self . schema . clone ( )
97
106
}
98
107
99
- fn output_partitioning ( & self ) -> datafusion_physical_expr:: Partitioning {
100
- Partitioning :: RoundRobinBatch ( 1 )
101
- }
102
-
103
- fn output_ordering ( & self ) -> Option < & [ datafusion_physical_expr:: PhysicalSortExpr ] > {
104
- None
105
- }
106
-
107
108
fn children ( & self ) -> Vec < Arc < dyn ExecutionPlan > > {
108
109
vec ! [ ]
109
110
}
@@ -135,7 +136,11 @@ impl ExecutionPlan for OneShotExec {
135
136
}
136
137
137
138
fn statistics ( & self ) -> datafusion_common:: Result < datafusion_common:: Statistics > {
138
- todo ! ( )
139
+ Ok ( Statistics :: new_unknown ( & self . schema ) )
140
+ }
141
+
142
+ fn properties ( & self ) -> & datafusion:: physical_plan:: PlanProperties {
143
+ & self . properties
139
144
}
140
145
}
141
146
@@ -194,7 +199,7 @@ pub fn execute_plan(
194
199
let session_state = SessionState :: new_with_config_rt ( session_config, runtime_env) ;
195
200
// NOTE: we are only executing the first partition here. Therefore, if
196
201
// the plan has more than one partition, we will be missing data.
197
- assert_eq ! ( plan. output_partitioning ( ) . partition_count( ) , 1 ) ;
202
+ assert_eq ! ( plan. properties ( ) . partitioning . partition_count( ) , 1 ) ;
198
203
Ok ( plan. execute ( 0 , session_state. task_ctx ( ) ) ?)
199
204
}
200
205
0 commit comments