Skip to content

Commit

Permalink
feat(connect): add temporal functions (#3799)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 authored Feb 12, 2025
1 parent ca36593 commit 8f10933
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 104 deletions.
12 changes: 8 additions & 4 deletions src/daft-connect/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use daft_dsl::{
ExprRef,
};
use once_cell::sync::Lazy;
use partition_transform::PartitionTransformFunctions;
use spark_connect::Expression;

use crate::{error::ConnectResult, invalid_argument_err, spark_analyzer::SparkAnalyzer};
mod aggregate;
mod core;
mod datetime;

mod math;
mod partition_transform;
mod string;
Expand All @@ -19,8 +20,9 @@ pub(crate) static CONNECT_FUNCTIONS: Lazy<SparkFunctions> = Lazy::new(|| {
let mut functions = SparkFunctions::new();
functions.register::<aggregate::AggregateFunctions>();
functions.register::<core::CoreFunctions>();
functions.register::<datetime::DatetimeFunctions>();
functions.register::<math::MathFunctions>();
functions.register::<PartitionTransformFunctions>();
functions.register::<partition_transform::PartitionTransformFunctions>();
functions.register::<string::StringFunctions>();
functions
});
Expand Down Expand Up @@ -104,8 +106,10 @@ impl SparkFunction for UnaryFunction {
}
}

struct Todo;
impl SparkFunction for Todo {
#[allow(non_camel_case_types)]
struct TODO_FUNCTION;

impl SparkFunction for TODO_FUNCTION {
fn to_expr(
&self,
_args: &[Expression],
Expand Down
27 changes: 13 additions & 14 deletions src/daft-connect/src/functions/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use daft_functions::{coalesce::Coalesce, float::IsNan};
use daft_sql::sql_expr;
use spark_connect::Expression;

use super::{FunctionModule, SparkFunction, Todo, UnaryFunction};
use super::{FunctionModule, SparkFunction, UnaryFunction, TODO_FUNCTION};
use crate::{
error::{ConnectError, ConnectResult},
invalid_argument_err,
Expand Down Expand Up @@ -32,27 +32,26 @@ impl FunctionModule for CoreFunctions {
parent.add_fn("^", BinaryOpFunction(Operator::Xor));
parent.add_fn("<<", BinaryOpFunction(Operator::ShiftLeft));
parent.add_fn(">>", BinaryOpFunction(Operator::ShiftRight));

// Normal Functions
// https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#normal-functions

parent.add_fn("coalesce", Coalesce {});
parent.add_fn("input_file_name", Todo);
parent.add_fn("input_file_name", TODO_FUNCTION);
parent.add_fn("isnan", IsNan {});
parent.add_fn("isnull", UnaryFunction(|arg| arg.is_null()));

parent.add_fn("monotically_increasing_id", Todo);
parent.add_fn("named_struct", Todo);
parent.add_fn("nanvl", Todo);
parent.add_fn("rand", Todo);
parent.add_fn("randn", Todo);
parent.add_fn("spark_partition_id", Todo);
parent.add_fn("when", Todo);
parent.add_fn("bitwise_not", Todo);
parent.add_fn("bitwiseNOT", Todo);
parent.add_fn("monotically_increasing_id", TODO_FUNCTION);
parent.add_fn("named_struct", TODO_FUNCTION);
parent.add_fn("nanvl", TODO_FUNCTION);
parent.add_fn("rand", TODO_FUNCTION);
parent.add_fn("randn", TODO_FUNCTION);
parent.add_fn("spark_partition_id", TODO_FUNCTION);
parent.add_fn("when", TODO_FUNCTION);
parent.add_fn("bitwise_not", TODO_FUNCTION);
parent.add_fn("bitwiseNOT", TODO_FUNCTION);
parent.add_fn("expr", SqlExpr);
parent.add_fn("greatest", Todo);
parent.add_fn("least", Todo);
parent.add_fn("greatest", TODO_FUNCTION);
parent.add_fn("least", TODO_FUNCTION);

// parent.add_fn("isnan", UnaryFunction(|arg| arg.is_nan()));

Expand Down
78 changes: 78 additions & 0 deletions src/daft-connect/src/functions/datetime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use daft_core::datatypes::TimeUnit;
use daft_functions::temporal::{Day, DayOfWeek, Hour, Minute, Month, Second, Year};
use daft_schema::dtype::DataType;

use super::{FunctionModule, UnaryFunction, TODO_FUNCTION};

/// https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#datetime-functions
pub struct DatetimeFunctions;

impl FunctionModule for DatetimeFunctions {
fn register(parent: &mut super::SparkFunctions) {
parent.add_fn("add_months", TODO_FUNCTION);
parent.add_fn("convert_timezone", TODO_FUNCTION);
parent.add_fn("curdate", TODO_FUNCTION);
parent.add_fn("current_date", TODO_FUNCTION);
parent.add_fn("current_timestamp", TODO_FUNCTION);
parent.add_fn("current_timezone", TODO_FUNCTION);
parent.add_fn("date_add", TODO_FUNCTION);
parent.add_fn("date_diff", TODO_FUNCTION);
parent.add_fn("date_format", TODO_FUNCTION);
parent.add_fn("date_from_unix_date", TODO_FUNCTION);
parent.add_fn("date_part", TODO_FUNCTION);
parent.add_fn("date_sub", TODO_FUNCTION);
parent.add_fn("date_trunc", TODO_FUNCTION);
parent.add_fn("dateadd", TODO_FUNCTION);
parent.add_fn("datediff", TODO_FUNCTION);
parent.add_fn("datepart", TODO_FUNCTION);
parent.add_fn("day", Day);
parent.add_fn("dayofmonth", TODO_FUNCTION);
parent.add_fn("dayofweek", DayOfWeek);
parent.add_fn("dayofyear", TODO_FUNCTION);
parent.add_fn("extract", TODO_FUNCTION);
parent.add_fn("from_unixtime", TODO_FUNCTION);
parent.add_fn("from_utc_timestamp", TODO_FUNCTION);
parent.add_fn("hour", Hour);
parent.add_fn("last_day", TODO_FUNCTION);
parent.add_fn("localtimestamp", TODO_FUNCTION);
parent.add_fn("make_date", TODO_FUNCTION);
parent.add_fn("make_dt_interval", TODO_FUNCTION);
parent.add_fn("make_interval", TODO_FUNCTION);
parent.add_fn("make_timestamp", TODO_FUNCTION);
parent.add_fn("make_timestamp_ltz", TODO_FUNCTION);
parent.add_fn("make_timestamp_ntz", TODO_FUNCTION);
parent.add_fn("make_ym_interval", TODO_FUNCTION);
parent.add_fn("minute", Minute);
parent.add_fn("month", Month);
parent.add_fn("months_between", TODO_FUNCTION);
parent.add_fn("next_day", TODO_FUNCTION);
parent.add_fn("now", TODO_FUNCTION);
parent.add_fn("quarter", TODO_FUNCTION);
parent.add_fn("second", Second);
parent.add_fn("session_window", TODO_FUNCTION);
parent.add_fn("timestamp_micros", TODO_FUNCTION);
parent.add_fn("timestamp_millis", TODO_FUNCTION);
parent.add_fn("timestamp_seconds", TODO_FUNCTION);
parent.add_fn("to_date", UnaryFunction(|arg| arg.cast(&DataType::Date)));
parent.add_fn(
"to_timestamp",
UnaryFunction(|arg| arg.cast(&DataType::Timestamp(TimeUnit::Milliseconds, None))),
);
parent.add_fn("to_timestamp_ltz", TODO_FUNCTION);
parent.add_fn("to_timestamp_ntz", TODO_FUNCTION);
parent.add_fn("to_unix_timestamp", TODO_FUNCTION);
parent.add_fn("to_utc_timestamp", TODO_FUNCTION);
parent.add_fn("trunc", TODO_FUNCTION);
parent.add_fn("try_to_timestamp", TODO_FUNCTION);
parent.add_fn("unix_date", TODO_FUNCTION);
parent.add_fn("unix_micros", TODO_FUNCTION);
parent.add_fn("unix_millis", TODO_FUNCTION);
parent.add_fn("unix_seconds", TODO_FUNCTION);
parent.add_fn("unix_timestamp", TODO_FUNCTION);
parent.add_fn("weekday", TODO_FUNCTION);
parent.add_fn("weekofyear", TODO_FUNCTION);
parent.add_fn("window", TODO_FUNCTION);
parent.add_fn("window_time", TODO_FUNCTION);
parent.add_fn("year", Year);
}
}
78 changes: 39 additions & 39 deletions src/daft-connect/src/functions/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use daft_functions::numeric::{
};
use spark_connect::Expression;

use super::{FunctionModule, SparkFunction, Todo};
use super::{FunctionModule, SparkFunction, TODO_FUNCTION};
use crate::{
error::{ConnectError, ConnectResult},
invalid_argument_err,
Expand All @@ -36,60 +36,60 @@ impl FunctionModule for MathFunctions {
parent.add_fn("atan", ArcTan);
parent.add_fn("atanh", ArcTanh);
parent.add_fn("atan2", Atan2 {});
parent.add_fn("bin", Todo);
parent.add_fn("bin", TODO_FUNCTION);
parent.add_fn("cbrt", Cbrt {});
parent.add_fn("ceil", Ceil {});
parent.add_fn("ceiling", Ceil {});
parent.add_fn("conv", Todo);
parent.add_fn("conv", TODO_FUNCTION);
parent.add_fn("cos", Cos {});
parent.add_fn("cosh", Todo);
parent.add_fn("cosh", TODO_FUNCTION);
parent.add_fn("cot", Cot {});
parent.add_fn("csc", Todo);
parent.add_fn("e", Todo);
parent.add_fn("csc", TODO_FUNCTION);
parent.add_fn("e", TODO_FUNCTION);
parent.add_fn("exp", Exp {});
parent.add_fn("expm1", Todo);
parent.add_fn("factorial", Todo);
parent.add_fn("expm1", TODO_FUNCTION);
parent.add_fn("factorial", TODO_FUNCTION);
parent.add_fn("floor", Floor {});
parent.add_fn("hex", Todo);
parent.add_fn("unhex", Todo);
parent.add_fn("hypot", Todo);
parent.add_fn("hex", TODO_FUNCTION);
parent.add_fn("unhex", TODO_FUNCTION);
parent.add_fn("hypot", TODO_FUNCTION);
parent.add_fn("ln", Ln {});
parent.add_fn("log", LogFunction);
parent.add_fn("log10", Log10 {});
parent.add_fn("log1p", Todo);
parent.add_fn("log1p", TODO_FUNCTION);
parent.add_fn("log2", Log2 {});
parent.add_fn("negate", Todo);
parent.add_fn("negative", Todo);
parent.add_fn("pi", Todo);
parent.add_fn("pmod", Todo);
parent.add_fn("positive", Todo);
parent.add_fn("pow", Todo);
parent.add_fn("power", Todo);
parent.add_fn("rint", Todo);
parent.add_fn("negate", TODO_FUNCTION);
parent.add_fn("negative", TODO_FUNCTION);
parent.add_fn("pi", TODO_FUNCTION);
parent.add_fn("pmod", TODO_FUNCTION);
parent.add_fn("positive", TODO_FUNCTION);
parent.add_fn("pow", TODO_FUNCTION);
parent.add_fn("power", TODO_FUNCTION);
parent.add_fn("rint", TODO_FUNCTION);
parent.add_fn("round", RoundFunction);
parent.add_fn("bround", Todo);
parent.add_fn("sec", Todo);
parent.add_fn("shiftleft", Todo);
parent.add_fn("shiftright", Todo);
parent.add_fn("sign", Todo);
parent.add_fn("signum", Todo);
parent.add_fn("bround", TODO_FUNCTION);
parent.add_fn("sec", TODO_FUNCTION);
parent.add_fn("shiftleft", TODO_FUNCTION);
parent.add_fn("shiftright", TODO_FUNCTION);
parent.add_fn("sign", TODO_FUNCTION);
parent.add_fn("signum", TODO_FUNCTION);
parent.add_fn("sin", Sin {});
parent.add_fn("sinh", Todo);
parent.add_fn("sinh", TODO_FUNCTION);
parent.add_fn("tan", Tan {});
parent.add_fn("tanh", Todo);
parent.add_fn("toDegrees", Todo);
parent.add_fn("try_add", Todo);
parent.add_fn("try_avg", Todo);
parent.add_fn("try_divide", Todo);
parent.add_fn("try_multiply", Todo);
parent.add_fn("try_subtract", Todo);
parent.add_fn("try_sum", Todo);
parent.add_fn("try_to_binary", Todo);
parent.add_fn("try_to_number", Todo);
parent.add_fn("tanh", TODO_FUNCTION);
parent.add_fn("toDegrees", TODO_FUNCTION);
parent.add_fn("try_add", TODO_FUNCTION);
parent.add_fn("try_avg", TODO_FUNCTION);
parent.add_fn("try_divide", TODO_FUNCTION);
parent.add_fn("try_multiply", TODO_FUNCTION);
parent.add_fn("try_subtract", TODO_FUNCTION);
parent.add_fn("try_sum", TODO_FUNCTION);
parent.add_fn("try_to_binary", TODO_FUNCTION);
parent.add_fn("try_to_number", TODO_FUNCTION);
parent.add_fn("degrees", Degrees {});
parent.add_fn("toRadians", Todo);
parent.add_fn("toRadians", TODO_FUNCTION);
parent.add_fn("radians", Radians {});
parent.add_fn("width_bucket", Todo);
parent.add_fn("width_bucket", TODO_FUNCTION);
//
}
}
Expand Down
1 change: 1 addition & 0 deletions src/daft-connect/src/functions/partition_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
spark_analyzer::SparkAnalyzer,
};

// https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#partition-transformation-functions
pub struct PartitionTransformFunctions;

impl FunctionModule for PartitionTransformFunctions {
Expand Down
Loading

0 comments on commit 8f10933

Please sign in to comment.