Skip to content

Commit

Permalink
feat: FlightSQL support nested types (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh authored Oct 25, 2023
1 parent c4f90c2 commit a13f402
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 43 deletions.
3 changes: 3 additions & 0 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ impl ToNapiValue for Value {
NaiveDateTime::new(v, NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
)
}
databend_driver::Value::Array(_) => String::to_napi_value(env, format!("{}", val.0)),
databend_driver::Value::Map(_) => String::to_napi_value(env, format!("{}", val.0)),
databend_driver::Value::Tuple(_) => String::to_napi_value(env, format!("{}", val.0)),
databend_driver::Value::Bitmap(s) => String::to_napi_value(env, s),
databend_driver::Value::Variant(s) => String::to_napi_value(env, s),
}
Expand Down
4 changes: 4 additions & 0 deletions cli/tests/00-base.result
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ Asia/Shanghai
0 0.00
1 1.00
2 2.00
[1,2,3] NULL (1,'ab')
NULL {'k1':'v1','k2':'v2'} (2,NULL)
1 NULL 1 ab
NULL v1 2 NULL
bye
6 changes: 6 additions & 0 deletions cli/tests/00-base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ insert into test_decimal select number, number from numbers(3);

select * from test_decimal;

drop table if exists test_nested;
create table test_nested(a array(int), b map(string, string), c tuple(x int, y string null));
insert into test_nested values([1,2,3], null, (1, 'ab')), (null, {'k1':'v1', 'k2':'v2'}, (2, null));
select * from test_nested;
select a[1], b['k1'], c:x, c:y from test_nested;

select 'bye';
drop table test;
drop table test_decimal;
2 changes: 1 addition & 1 deletion sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ databend-client = { workspace = true }
chrono = { version = "0.4", default-features = false }
glob = "0.3"
itertools = "0.11"
jsonb = "0.2"
jsonb = "0.3"
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
Expand Down
22 changes: 19 additions & 3 deletions sql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl DecimalDataType {
#[derive(Debug, Clone)]
pub enum DataType {
Null,
Nothing,
EmptyArray,
EmptyMap,
Boolean,
Expand Down Expand Up @@ -103,7 +102,6 @@ impl std::fmt::Display for DataType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
DataType::Null => write!(f, "Null"),
DataType::Nothing => write!(f, "Nothing"),
DataType::EmptyArray => write!(f, "EmptyArray"),
DataType::EmptyMap => write!(f, "EmptyMap"),
DataType::Boolean => write!(f, "Boolean"),
Expand Down Expand Up @@ -170,7 +168,6 @@ impl TryFrom<&TypeDesc<'_>> for DataType {
fn try_from(desc: &TypeDesc) -> Result<Self> {
let dt = match desc.name {
"Null" | "NULL" => DataType::Null,
"Nothing" => DataType::Nothing,
"Boolean" => DataType::Boolean,
"String" => DataType::String,
"Int8" => DataType::Number(NumberDataType::Int8),
Expand Down Expand Up @@ -329,6 +326,25 @@ impl TryFrom<&Arc<ArrowField>> for Field {
scale: *s as u8,
}))
}
ArrowDataType::List(f) | ArrowDataType::LargeList(f) => {
let inner_field = Field::try_from(f)?;
let inner_ty = inner_field.data_type;
DataType::Array(Box::new(inner_ty))
}
ArrowDataType::Map(f, _) => {
let inner_field = Field::try_from(f)?;
let inner_ty = inner_field.data_type;
DataType::Map(Box::new(inner_ty))
}
ArrowDataType::Struct(fs) => {
let mut inner_tys = Vec::with_capacity(fs.len());
for f in fs {
let inner_field = Field::try_from(f)?;
let inner_ty = inner_field.data_type;
inner_tys.push(inner_ty);
}
DataType::Tuple(inner_tys)
}
_ => {
return Err(Error::Parsing(format!(
"Unsupported datatype for arrow field: {:?}",
Expand Down
186 changes: 147 additions & 39 deletions sql/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use {
arrow_array::{
Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
LargeBinaryArray, LargeStringArray, StringArray, TimestampMicrosecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit},
std::sync::Arc,
Expand Down Expand Up @@ -71,9 +71,9 @@ pub enum Value {
/// Microseconds from 1970-01-01 00:00:00 UTC
Timestamp(i64),
Date(i32),
// Array(Vec<Value>),
// Map(Vec<(Value, Value)>),
// Tuple(Vec<Value>),
Array(Vec<Value>),
Map(Vec<(Value, Value)>),
Tuple(Vec<Value>),
Bitmap(String),
Variant(String),
}
Expand Down Expand Up @@ -103,10 +103,25 @@ impl Value {
Self::Timestamp(_) => DataType::Timestamp,

Self::Date(_) => DataType::Date,
// TODO:(everpcpc) fix nested type
// Self::Array(v) => DataType::Array(Box::new(v[0].get_type())),
// Self::Map(_) => DataType::Map(Box::new(DataType::Null)),
// Self::Tuple(_) => DataType::Tuple(vec![]),
Self::Array(vals) => {
if vals.is_empty() {
DataType::EmptyArray
} else {
DataType::Array(Box::new(vals[0].get_type()))
}
}
Self::Map(kvs) => {
if kvs.is_empty() {
DataType::EmptyMap
} else {
let inner_ty = DataType::Tuple(vec![kvs[0].0.get_type(), kvs[0].1.get_type()]);
DataType::Map(Box::new(inner_ty))
}
}
Self::Tuple(vals) => {
let inner_tys = vals.iter().map(|v| v.get_type()).collect::<Vec<_>>();
DataType::Tuple(inner_tys)
}
Self::Bitmap(_) => DataType::Bitmap,
Self::Variant(_) => DataType::Variant,
}
Expand Down Expand Up @@ -360,20 +375,61 @@ impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize)> for Value {
Some(array) => Ok(Value::Date(array.value(seq))),
None => Err(ConvertError::new("date", format!("{:?}", array)).into()),
},
ArrowDataType::Date64
| ArrowDataType::Time32(_)
| ArrowDataType::Time64(_)
| ArrowDataType::Interval(_)
| ArrowDataType::Duration(_) => {
Err(ConvertError::new("unsupported data type", format!("{:?}", array)).into())
}
// ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
// let v = array.as_list_opt::<i64>().unwrap().value(seq);
// Ok(Value::String(format!("{:?}", v)))
// }
// Struct(Vec<Field>),
// Map(Box<Field>, bool),
// RunEndEncoded(Box<Field>, Box<Field>),
ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
Some(array) => {
let inner_array = unsafe { array.value_unchecked(seq) };
let mut values = Vec::with_capacity(inner_array.len());
for i in 0..inner_array.len() {
let value = Value::try_from((f.as_ref(), &inner_array, i))?;
values.push(value);
}
Ok(Value::Array(values))
}
None => Err(ConvertError::new("list", format!("{:?}", array)).into()),
},
ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
Some(array) => {
let inner_array = unsafe { array.value_unchecked(seq) };
let mut values = Vec::with_capacity(inner_array.len());
for i in 0..inner_array.len() {
let value = Value::try_from((f.as_ref(), &inner_array, i))?;
values.push(value);
}
Ok(Value::Array(values))
}
None => Err(ConvertError::new("large list", format!("{:?}", array)).into()),
},
ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
Some(array) => {
if let ArrowDataType::Struct(fs) = f.data_type() {
let inner_array = unsafe { array.value_unchecked(seq) };
let mut values = Vec::with_capacity(inner_array.len());
for i in 0..inner_array.len() {
let key = Value::try_from((fs[0].as_ref(), inner_array.column(0), i))?;
let val = Value::try_from((fs[1].as_ref(), inner_array.column(1), i))?;
values.push((key, val));
}
Ok(Value::Map(values))
} else {
Err(
ConvertError::new("invalid map inner type", format!("{:?}", array))
.into(),
)
}
}
None => Err(ConvertError::new("map", format!("{:?}", array)).into()),
},
ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
Some(array) => {
let mut values = Vec::with_capacity(array.len());
for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
let value = Value::try_from((f.as_ref(), inner_array, seq))?;
values.push(value);
}
Ok(Value::Tuple(values))
}
None => Err(ConvertError::new("struct", format!("{:?}", array)).into()),
},
_ => Err(ConvertError::new("unsupported data type", format!("{:?}", array)).into()),
}
}
Expand Down Expand Up @@ -533,26 +589,78 @@ impl std::fmt::Display for NumberValue {

impl std::fmt::Display for Value {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Value::Null => write!(f, "NULL"),
Value::EmptyArray => write!(f, "[]"),
Value::EmptyMap => write!(f, "{{}}"),
Value::Boolean(b) => write!(f, "{}", b),
Value::Number(n) => write!(f, "{}", n),
Value::String(s) => write!(f, "{}", s),
Value::Timestamp(i) => {
let secs = i / 1_000_000;
let nanos = ((i % 1_000_000) * 1000) as u32;
let t = NaiveDateTime::from_timestamp_opt(secs, nanos).unwrap_or_default();
encode_value(f, self, true)
}
}

// Compatible with Databend, inner values of nested types are quoted.
fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result {
match val {
Value::Null => write!(f, "NULL"),
Value::EmptyArray => write!(f, "[]"),
Value::EmptyMap => write!(f, "{{}}"),
Value::Boolean(b) => write!(f, "{}", b),
Value::Number(n) => write!(f, "{}", n),
Value::String(s) | Value::Bitmap(s) | Value::Variant(s) => {
if raw {
write!(f, "{}", s)
} else {
write!(f, "'{}'", s)
}
}
Value::Timestamp(i) => {
let secs = i / 1_000_000;
let nanos = ((i % 1_000_000) * 1000) as u32;
let t = NaiveDateTime::from_timestamp_opt(secs, nanos).unwrap_or_default();
if raw {
write!(f, "{}", t)
} else {
write!(f, "'{}'", t)
}
Value::Date(i) => {
let days = i + DAYS_FROM_CE;
let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
}
Value::Date(i) => {
let days = i + DAYS_FROM_CE;
let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
if raw {
write!(f, "{}", d)
} else {
write!(f, "'{}'", d)
}
}
Value::Array(vals) => {
write!(f, "[")?;
for (i, val) in vals.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
encode_value(f, val, false)?;
}
write!(f, "]")?;
Ok(())
}
Value::Map(kvs) => {
write!(f, "{{")?;
for (i, (key, val)) in kvs.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
encode_value(f, key, false)?;
write!(f, ":")?;
encode_value(f, val, false)?;
}
write!(f, "}}")?;
Ok(())
}
Value::Tuple(vals) => {
write!(f, "(")?;
for (i, val) in vals.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
encode_value(f, val, false)?;
}
Value::Bitmap(s) => write!(f, "{}", s),
Value::Variant(s) => write!(f, "{}", s),
write!(f, ")")?;
Ok(())
}
}
}
Expand Down

0 comments on commit a13f402

Please sign in to comment.