Skip to content

Commit

Permalink
Add user_defined_sql_planners(..) to FunctionRegistry (#11296)
Browse files Browse the repository at this point in the history
* Add user_defined_sql_planners(..) to FunctionRegistry

* Adding simple test for user_defined_sql_planners

* Renamed user_defined_sql_planners to expr_planners
  • Loading branch information
Omega359 authored Jul 7, 2024
1 parent e693ed7 commit 9f8ba6a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 3 deletions.
4 changes: 4 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,10 @@ impl FunctionRegistry for SessionContext {
self.state.write().register_function_rewrite(rewrite)
}

fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>> {
self.state.read().expr_planners()
}

fn register_user_defined_sql_planner(
&mut self,
user_defined_sql_planner: Arc<dyn UserDefinedSQLPlanner>,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,10 @@ impl FunctionRegistry for SessionState {
Ok(())
}

fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>> {
self.user_defined_sql_planners.clone()
}

fn register_user_defined_sql_planner(
&mut self,
user_defined_sql_planner: Arc<dyn UserDefinedSQLPlanner>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,17 @@ async fn test_user_defined_functions_cast_to_i64() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_user_defined_sql_functions() -> Result<()> {
let ctx = SessionContext::new();

let sql_planners = ctx.expr_planners();

assert!(!sql_planners.is_empty());

Ok(())
}

#[tokio::test]
async fn deregister_udf() -> Result<()> {
let cast2i64 = ScalarUDF::from(CastToI64UDF::new());
Expand Down
10 changes: 7 additions & 3 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use std::{
sync::Arc,
};

use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};

use crate::{
config::SessionConfig,
memory_pool::MemoryPool,
registry::FunctionRegistry,
runtime_env::{RuntimeConfig, RuntimeEnv},
};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::planner::UserDefinedSQLPlanner;
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};

/// Task Execution Context
///
Expand Down Expand Up @@ -191,6 +191,10 @@ impl FunctionRegistry for TaskContext {
});
Ok(self.scalar_functions.insert(udf.name().into(), udf))
}

fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>> {
vec![]
}
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ pub trait FunctionRegistry {
not_impl_err!("Registering FunctionRewrite")
}

/// Set of all registered [`UserDefinedSQLPlanner`]s
fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>>;

/// Registers a new [`UserDefinedSQLPlanner`] with the registry.
fn register_user_defined_sql_planner(
&mut self,
Expand Down Expand Up @@ -192,4 +195,8 @@ impl FunctionRegistry for MemoryFunctionRegistry {
fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
Ok(self.udwfs.insert(udaf.name().into(), udaf))
}

fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>> {
vec![]
}
}
5 changes: 5 additions & 0 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::sync::Arc;
use datafusion::execution::registry::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_expr::planner::UserDefinedSQLPlanner;

mod registry;

Expand Down Expand Up @@ -165,6 +166,10 @@ impl Serializeable for Expr {
"register_udwf called in Placeholder Registry!"
)
}

fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>> {
vec![]
}
}
Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;

Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/src/bytes/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{collections::HashSet, sync::Arc};
use datafusion::execution::registry::FunctionRegistry;
use datafusion_common::plan_err;
use datafusion_common::Result;
use datafusion_expr::planner::UserDefinedSQLPlanner;
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};

/// A default [`FunctionRegistry`] registry that does not resolve any
Expand Down Expand Up @@ -54,4 +55,8 @@ impl FunctionRegistry for NoRegistry {
fn register_udwf(&mut self, udwf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Window Function '{}'", udwf.inner().name())
}

fn expr_planners(&self) -> Vec<Arc<dyn UserDefinedSQLPlanner>> {
vec![]
}
}

0 comments on commit 9f8ba6a

Please sign in to comment.