Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): introduce arrow-udf-js #14799

Merged
merged 35 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
de1e66c
feat: support code string block
sundy-li Feb 28, 2024
055874b
feat: support udf interpreter create
sundy-li Feb 28, 2024
82b3492
feat: support udf interpreter create
sundy-li Feb 28, 2024
c040002
feat: update
sundy-li Feb 28, 2024
7ac012b
feat: upgrade arrow to version 50
sundy-li Feb 28, 2024
2d5bc31
Merge branch 'main' into arrow-udf
sundy-li Feb 28, 2024
e75149e
feat: update
sundy-li Feb 28, 2024
ef3e528
feat: update
sundy-li Feb 29, 2024
eb677a5
feat: update
sundy-li Feb 29, 2024
712f28e
Merge branch 'main' into arrow-50
sundy-li Feb 29, 2024
04adb0b
Merge branch 'arrow-50' into arrow-udf
sundy-li Feb 29, 2024
30e9be4
Merge branch 'main' into arrow-50
sundy-li Feb 29, 2024
ac48f52
feat: update
sundy-li Feb 29, 2024
05f579f
feat: update
sundy-li Feb 29, 2024
c90dd25
feat: update
sundy-li Feb 29, 2024
e18af81
feat: update
sundy-li Feb 29, 2024
c874706
feat: update
sundy-li Feb 29, 2024
345f8d9
feat: update
sundy-li Feb 29, 2024
34f79a7
Merge branch 'arrow-50' into arrow-udf
sundy-li Feb 29, 2024
b2a6e20
Merge branch 'main' into arrow-50
sundy-li Feb 29, 2024
11291a2
Merge branch 'arrow-50' into arrow-udf
sundy-li Feb 29, 2024
d178bf0
Merge branch 'main' into arrow-udf
sundy-li Mar 2, 2024
01da099
update
sundy-li Mar 3, 2024
ba59dd3
update
sundy-li Mar 4, 2024
52d9905
update
sundy-li Mar 4, 2024
145770a
update
sundy-li Mar 4, 2024
8060783
update
sundy-li Mar 4, 2024
b7bacc5
update
sundy-li Mar 4, 2024
da5761f
Merge branch 'main' into arrow-udf
sundy-li Mar 4, 2024
28f5701
update
sundy-li Mar 4, 2024
54a48e6
update
sundy-li Mar 4, 2024
0cbdb0f
update
sundy-li Mar 4, 2024
84aac10
update
sundy-li Mar 4, 2024
bfd1a72
update
sundy-li Mar 4, 2024
f9fddc9
Merge branch 'main' into arrow-udf
BohuTANG Mar 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ async-trait = { version = "0.1.77", package = "async-trait-fn" }
bincode = { version = "2.0.0-rc.3", features = ["serde", "std", "alloc"] }
borsh = { version = "1.2.1", features = ["derive"] }
bytes = "1.5.0"
hashbrown = { version = "0.14.3", default-features = false }
byteorder = "1.4.3"
chrono = { version = "0.4.31", features = ["serde"] }
chrono-tz = { version = "0.8", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub use user_auth::PasswordHashMethod;
pub use user_defined_file_format::UserDefinedFileFormat;
pub use user_defined_function::LambdaUDF;
pub use user_defined_function::UDFDefinition;
pub use user_defined_function::UDFScript;
pub use user_defined_function::UDFServer;
pub use user_defined_function::UdfName;
pub use user_defined_function::UserDefinedFunction;
Expand Down
56 changes: 56 additions & 0 deletions src/meta/app/src/principal/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,21 @@ pub struct UDFServer {
pub return_type: DataType,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UDFScript {
pub code: String,
pub handler: String,
pub language: String,
pub arg_types: Vec<DataType>,
pub return_type: DataType,
pub runtime_version: String,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum UDFDefinition {
LambdaUDF(LambdaUDF),
UDFServer(UDFServer),
UDFScript(UDFScript),
}

#[derive(Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -108,6 +119,31 @@ impl UserDefinedFunction {
created_on: Utc::now(),
}
}

pub fn create_udf_script(
name: &str,
code: &str,
handler: &str,
language: &str,
arg_types: Vec<DataType>,
return_type: DataType,
runtime_version: &str,
description: &str,
) -> Self {
Self {
name: name.to_string(),
description: description.to_string(),
definition: UDFDefinition::UDFScript(UDFScript {
code: code.to_string(),
handler: handler.to_string(),
language: language.to_string(),
arg_types,
return_type,
runtime_version: runtime_version.to_string(),
}),
created_on: Utc::now(),
}
}
}

impl Display for UDFDefinition {
Expand Down Expand Up @@ -144,6 +180,26 @@ impl Display for UDFDefinition {
") RETURNS {return_type} LANGUAGE {language} HANDLER = {handler} ADDRESS = {address}"
)?;
}

UDFDefinition::UDFScript(UDFScript {
code,
arg_types,
return_type,
handler,
language,
runtime_version,
}) => {
for (i, item) in arg_types.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{item}")?;
}
write!(
f,
") RETURNS {return_type} LANGUAGE {language} RUNTIME_VERSION = {runtime_version} HANDLER = {handler} AS $${code}$$"
)?;
}
}
Ok(())
}
Expand Down
64 changes: 64 additions & 0 deletions src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,64 @@ impl FromToProto for mt::UDFServer {
}
}

impl FromToProto for mt::UDFScript {
type PB = pb::UdfScript;
fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}
fn from_pb(p: pb::UdfScript) -> Result<Self, Incompatible> {
reader_check_msg(p.ver, p.min_reader_ver)?;

let mut arg_types = Vec::with_capacity(p.arg_types.len());
for arg_type in p.arg_types {
let arg_type = DataType::from(&TableDataType::from_pb(arg_type)?);
arg_types.push(arg_type);
}
let return_type = DataType::from(&TableDataType::from_pb(p.return_type.ok_or_else(
|| Incompatible {
reason: "UDFScript.return_type can not be None".to_string(),
},
)?)?);

Ok(mt::UDFScript {
code: p.code,
arg_types,
return_type,
handler: p.handler,
language: p.language,
runtime_version: p.runtime_version,
})
}

fn to_pb(&self) -> Result<pb::UdfScript, Incompatible> {
let mut arg_types = Vec::with_capacity(self.arg_types.len());
for arg_type in self.arg_types.iter() {
let arg_type = infer_schema_type(arg_type)
.map_err(|e| Incompatible {
reason: format!("Convert DataType to TableDataType failed: {}", e.message()),
})?
.to_pb()?;
arg_types.push(arg_type);
}
let return_type = infer_schema_type(&self.return_type)
.map_err(|e| Incompatible {
reason: format!("Convert DataType to TableDataType failed: {}", e.message()),
})?
.to_pb()?;

Ok(pb::UdfScript {
ver: VER,
min_reader_ver: MIN_READER_VER,
code: self.code.clone(),
handler: self.handler.clone(),
language: self.language.clone(),
arg_types,
return_type: Some(return_type),
runtime_version: self.runtime_version.clone(),
})
}
}

impl FromToProto for mt::UserDefinedFunction {
type PB = pb::UserDefinedFunction;
fn get_pb_ver(p: &Self::PB) -> u64 {
Expand All @@ -120,6 +178,9 @@ impl FromToProto for mt::UserDefinedFunction {
Some(pb::user_defined_function::Definition::UdfServer(udf_server)) => {
mt::UDFDefinition::UDFServer(mt::UDFServer::from_pb(udf_server)?)
}
Some(pb::user_defined_function::Definition::UdfScript(udf_script)) => {
mt::UDFDefinition::UDFScript(mt::UDFScript::from_pb(udf_script)?)
}
None => {
return Err(Incompatible {
reason: "UserDefinedFunction.definition cannot be None".to_string(),
Expand All @@ -146,6 +207,9 @@ impl FromToProto for mt::UserDefinedFunction {
mt::UDFDefinition::UDFServer(udf_server) => {
pb::user_defined_function::Definition::UdfServer(udf_server.to_pb()?)
}
mt::UDFDefinition::UDFScript(udf_script) => {
pb::user_defined_function::Definition::UdfScript(udf_script.to_pb()?)
}
};

Ok(pb::UserDefinedFunction {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(77, "2024-01-22: Remove: allow_anonymous in S3 Config", ),
(78, "2024-01-29: Refactor: GrantEntry::UserPrivilegeType and ShareGrantEntry::ShareGrantObjectPrivilege use from_bits_truncate deserialize", ),
(79, "2024-01-31: Add: udf.proto/UserDefinedFunction add created_on field", ),
(80, "2024-02-01: Add: Add: datatype.proto/DataType Geometry type")
(80, "2024-02-01: Add: Add: datatype.proto/DataType Geometry type"),
(81, "2024-03-94: Add: Add: udf.udf_script")
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ mod v076_role_ownership_info;
mod v077_s3_remove_allow_anonymous;
mod v078_grantentry;
mod v079_udf_created_on;
mod v080_geometry_datatype;
mod v081_udf_script;
92 changes: 92 additions & 0 deletions src/meta/proto-conv/tests/it/v081_udf_script.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_meta_app::principal::LambdaUDF;
use databend_common_meta_app::principal::UDFDefinition;
use databend_common_meta_app::principal::UDFServer;
use databend_common_meta_app::principal::UserDefinedFunction;
use minitrace::func_name;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_pb_from_to()`
#[test]
fn test_decode_v81_udf_python() -> anyhow::Result<()> {
let bytes = vec![
10, 8, 112, 108, 117, 115, 95, 105, 110, 116, 18, 21, 84, 104, 105, 115, 32, 105, 115, 32,
97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 34, 107, 10, 21, 104, 116,
116, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 111, 115, 116, 58, 56, 56, 56, 56, 18,
11, 112, 108, 117, 115, 95, 105, 110, 116, 95, 112, 121, 26, 6, 112, 121, 116, 104, 111,
110, 34, 17, 154, 2, 8, 58, 0, 160, 6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 34, 17, 154,
2, 8, 58, 0, 160, 6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 42, 17, 154, 2, 8, 66, 0, 160,
6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 42, 23, 50, 48, 50, 51,
45, 49, 50, 45, 49, 53, 32, 48, 49, 58, 50, 54, 58, 48, 57, 32, 85, 84, 67, 160, 6, 81,
168, 6, 24,
];

let want = || UserDefinedFunction {
name: "plus_int".to_string(),
description: "This is a description".to_string(),
definition: UDFDefinition::UDFServer(UDFServer {
address: "http://localhost:8888".to_string(),
handler: "plus_int_py".to_string(),
language: "python".to_string(),
arg_types: vec![
DataType::Number(NumberDataType::Int32),
DataType::Number(NumberDataType::Int32),
],
return_type: DataType::Number(NumberDataType::Int64),
}),
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 81, want())
}

#[test]
fn test_decode_v81_udf_sql() -> anyhow::Result<()> {
let bytes = vec![
10, 10, 105, 115, 110, 111, 116, 101, 109, 112, 116, 121, 18, 21, 84, 104, 105, 115, 32,
105, 115, 32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 26, 34, 10, 1,
112, 18, 23, 40, 112, 41, 32, 45, 62, 32, 40, 78, 79, 84, 32, 105, 115, 95, 110, 117, 108,
108, 40, 112, 41, 41, 160, 6, 81, 168, 6, 24, 42, 23, 49, 57, 55, 53, 45, 48, 53, 45, 50,
53, 32, 49, 54, 58, 51, 57, 58, 52, 52, 32, 85, 84, 67, 160, 6, 81, 168, 6, 24,
];
let want = || UserDefinedFunction {
name: "isnotempty".to_string(),
description: "This is a description".to_string(),
definition: UDFDefinition::LambdaUDF(LambdaUDF {
parameters: vec!["p".to_string()],
definition: "(p) -> (NOT is_null(p))".to_string(),
}),
created_on: DateTime::<Utc>::from_timestamp(170267984, 0).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 81, want())
}
14 changes: 14 additions & 0 deletions src/meta/protos/proto/udf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ message UDFServer {
DataType return_type = 5;
}

message UDFScript {
uint64 ver = 100;
uint64 min_reader_ver = 101;

string code = 1;
string handler = 2;
string language = 3;
repeated DataType arg_types = 4;
DataType return_type = 5;
string runtime_version = 6;
}


message UserDefinedFunction {
uint64 ver = 100;
uint64 min_reader_ver = 101;
Expand All @@ -46,6 +59,7 @@ message UserDefinedFunction {
oneof definition {
LambdaUDF lambda_udf = 3;
UDFServer udf_server = 4;
UDFScript udf_script = 6;
}
// The time udf created.
optional string created_on = 5;
Expand Down
Loading
Loading