From ce3ad14b74ae5fae043d9058ac7d7fd39509b024 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Wed, 16 Oct 2024 22:34:15 +0200 Subject: [PATCH 1/2] [datafusion] prepend physical_optimizer_rule before other rules This is a bug fix that was introduced in https://github.com/restatedev/restate/pull/2004 Which change the order that optimizers are registered. The reason for this is described as a comment few lines above. --- crates/storage-query-datafusion/src/context.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 86afa16291..b3ef301c66 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -18,7 +18,6 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SQLOptions; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::SessionStateBuilder; -use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -159,6 +158,7 @@ impl QueryContext { Ok(ctx) } + #[allow(deprecated)] fn new( memory_limit: usize, temp_folder: Option, @@ -225,12 +225,14 @@ impl QueryContext { // A far more involved but potentially more robust solution would be wrap the SymmetricHashJoin in a ProjectionExec // If this would become an issue for any reason, then we can explore that alternative. // - let physical_optimizers: Vec> = - vec![Arc::new(physical_optimizer::JoinRewrite::new())]; - state_builder = state_builder.with_physical_optimizer_rules(physical_optimizers); + let mut state = state_builder.build(); - let state = state_builder.build(); + let join_rewrite = Arc::new(physical_optimizer::JoinRewrite::new()); + let mut optimizers = state.physical_optimizers().to_vec(); + optimizers.insert(0, join_rewrite); + + state = state.with_physical_optimizer_rules(optimizers); let ctx = SessionContext::new_with_state(state); From 29c75f6eba14d9f79bfa78990c333e04b3e1e0af Mon Sep 17 00:00:00 2001 From: igalshilman Date: Thu, 17 Oct 2024 10:57:24 +0200 Subject: [PATCH 2/2] [datafusion] Use the builder instead of a deprecated method --- crates/storage-query-datafusion/src/context.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index b3ef301c66..796e6a6619 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -18,6 +18,7 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SQLOptions; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::SessionStateBuilder; +use datafusion::physical_optimizer::optimizer::PhysicalOptimizer; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -158,7 +159,6 @@ impl QueryContext { Ok(ctx) } - #[allow(deprecated)] fn new( memory_limit: usize, temp_folder: Option, @@ -225,15 +225,14 @@ impl QueryContext { // A far more involved but potentially more robust solution would be wrap the SymmetricHashJoin in a ProjectionExec // If this would become an issue for any reason, then we can explore that alternative. // - - let mut state = state_builder.build(); - let join_rewrite = Arc::new(physical_optimizer::JoinRewrite::new()); - let mut optimizers = state.physical_optimizers().to_vec(); - optimizers.insert(0, join_rewrite); + let mut default_physical_optimizer_rules = PhysicalOptimizer::default().rules; + default_physical_optimizer_rules.insert(0, join_rewrite); - state = state.with_physical_optimizer_rules(optimizers); + state_builder = + state_builder.with_physical_optimizer_rules(default_physical_optimizer_rules); + let state = state_builder.build(); let ctx = SessionContext::new_with_state(state); let sql_options = SQLOptions::new()