From ce3ad14b74ae5fae043d9058ac7d7fd39509b024 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Wed, 16 Oct 2024 22:34:15 +0200 Subject: [PATCH] [datafusion] prepend physical_optimizer_rule before other rules This is a bug fix that was introduced in https://github.com/restatedev/restate/pull/2004 Which change the order that optimizers are registered. The reason for this is described as a comment few lines above. --- crates/storage-query-datafusion/src/context.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 86afa16291..b3ef301c66 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -18,7 +18,6 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SQLOptions; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::SessionStateBuilder; -use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -159,6 +158,7 @@ impl QueryContext { Ok(ctx) } + #[allow(deprecated)] fn new( memory_limit: usize, temp_folder: Option, @@ -225,12 +225,14 @@ impl QueryContext { // A far more involved but potentially more robust solution would be wrap the SymmetricHashJoin in a ProjectionExec // If this would become an issue for any reason, then we can explore that alternative. // - let physical_optimizers: Vec> = - vec![Arc::new(physical_optimizer::JoinRewrite::new())]; - state_builder = state_builder.with_physical_optimizer_rules(physical_optimizers); + let mut state = state_builder.build(); - let state = state_builder.build(); + let join_rewrite = Arc::new(physical_optimizer::JoinRewrite::new()); + let mut optimizers = state.physical_optimizers().to_vec(); + optimizers.insert(0, join_rewrite); + + state = state.with_physical_optimizer_rules(optimizers); let ctx = SessionContext::new_with_state(state);