Skip to content

Commit

Permalink
End to end udf registration working
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Jan 29, 2025
1 parent 84cb7cd commit b7f1fb4
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub struct IcebergConfig {
#[derive(Clone, Debug, Deserialize)]
pub struct WasmFuncDetails {
pub name: String,
pub input_types: String,
pub input_types: Vec<String>,
pub return_type: String,
}

Expand Down
40 changes: 30 additions & 10 deletions src/execution/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use std::sync::Arc;

use color_eyre::{eyre::eyre, Result};
use datafusion::{
arrow::datatypes::DataType,
arrow::{array::ArrayRef, datatypes::DataType},
logical_expr::{ColumnarValue, ScalarUDF, Volatility},
prelude::create_udf,
};
use datafusion_common::{DataFusionError, Result as DFResult, ScalarValue};
use log::error;
use datafusion_common::{DataFusionError, Result as DFResult};
use log::{error, info};
use wasmtime::{Instance, Module, Store};

use crate::config::{WasmFuncDetails, WasmUdfConfig};
Expand All @@ -34,30 +34,45 @@ pub fn udf_signature_from_func_details(
) -> Result<(Vec<DataType>, DataType)> {
let input_types: Result<Vec<DataType>> = func_details
.input_types
.as_str()
.split(",")
.iter()
.map(|s| {
let t: DataType = s.try_into()?;
let t: DataType = s.as_str().try_into()?;
Ok(t)
})
.collect();
let return_type: DataType = func_details.return_type.as_str().try_into()?;
Ok((input_types?, return_type))
}

fn validate_args(args: &[ColumnarValue], input_types: &[DataType]) -> DFResult<()> {
// First check that the defined input_types and args have same number of columns
if args.len() != input_types.len() {
return Err(DataFusionError::Execution(
"The number of arguments is incorrect".to_string(),
));
}

Ok(())
}

fn create_wasm_udf_impl(
module_bytes: Vec<u8>,
input_types: Vec<DataType>,
return_type: DataType,
) -> impl Fn(&[ColumnarValue]) -> DFResult<ColumnarValue> {
move |args: &[ColumnarValue]| {
// First validate the arguments
validate_args(args, &input_types)?;
// Load the function again
let mut store = Store::<()>::default();

let module = Module::from_binary(store.engine(), &module_bytes)
.map_err(|e| DataFusionError::Internal(format!("Error loading module: {e:?}")))?;

let instance = Instance::new(&mut store, &module, &[])
.map_err(|e| DataFusionError::Internal(format!("Error instantiating module: {e:?}")))?;
Ok(ColumnarValue::Scalar(ScalarValue::Null))

let vals = ColumnarValue::values_to_arrays(args)?;
let first = &vals[0];
Ok(ColumnarValue::Array(Arc::clone(first)))
}
}

Expand All @@ -77,7 +92,12 @@ pub fn create_wasm_udfs(wasm_udf_config: &WasmUdfConfig) -> Result<Vec<ScalarUDF
if instance.get_func(&mut store, &func_details.name).is_none() {
error!("WASM function {} is missing in module", &func_details.name);
} else {
let udf_impl = create_wasm_udf_impl(module_bytes.to_owned());
let udf_impl = create_wasm_udf_impl(
module_bytes.to_owned(),
input_types.clone(),
return_type.clone(),
);
info!("Registering WASM function {} with input {input_types:?} and return_type {return_type:?}", &func_details.name);
let udf = create_udf(
&func_details.name,
input_types,
Expand Down
7 changes: 3 additions & 4 deletions src/tui/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod tabs;

use crate::tui::state::tabs::sql::SQLTabState;
use crate::tui::ui::SelectedTab;
use log::{debug, error, info};
use log::{debug, error};
use std::path::PathBuf;

use self::tabs::{history::HistoryTabState, logs::LogsTabState};
Expand Down Expand Up @@ -54,8 +54,7 @@ pub struct AppState<'app> {
}

pub fn initialize<'app>(config_path: PathBuf) -> AppState<'app> {
debug!("Initializing state");
debug!("Config path: {:?}", config_path);
debug!("Initializing state from config path: {:?}", config_path);
let config = if config_path.exists() {
debug!("Config exists");
let maybe_config_contents = std::fs::read_to_string(config_path);
Expand All @@ -64,7 +63,7 @@ pub fn initialize<'app>(config_path: PathBuf) -> AppState<'app> {
toml::from_str(&config_contents);
match maybe_parsed_config {
Ok(parsed_config) => {
info!("Parsed config: {:?}", parsed_config);
debug!("Parsed config: {:?}", parsed_config);
parsed_config
}
Err(err) => {
Expand Down

0 comments on commit b7f1fb4

Please sign in to comment.