Skip to content

Commit

Permalink
feat: support decode nested types from string to values (#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh authored Apr 11, 2024
1 parent 1e744f8 commit 062a189
Show file tree
Hide file tree
Showing 12 changed files with 899 additions and 21 deletions.
1 change: 1 addition & 0 deletions cli/tests/00-base.result
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ Asia/Shanghai
NULL {'k1':'v1','k2':'v2'} (2,NULL)
1 NULL 1 ab
NULL v1 2 NULL
{'k1':'v1','k2':'v2'} [6162,78797A] ('[1,2]','SRID=4326;POINT(1 2)','2024-04-10')
bye
2 changes: 2 additions & 0 deletions cli/tests/00-base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ insert into test_nested values([1,2,3], null, (1, 'ab')), (null, {'k1':'v1', 'k2
select * from test_nested;
select a[1], b['k1'], c:x, c:y from test_nested;

select {'k1':'v1','k2':'v2'}, [to_binary('ab'), to_binary('xyz')], (parse_json('[1,2]'), st_geometryfromwkt('SRID=4326;POINT(1.0 2.0)'), to_date('2024-04-10'));

select 'bye';
drop table test;
drop table test_decimal;
Expand Down
103 changes: 93 additions & 10 deletions driver/tests/driver/select_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,99 @@ async fn select_nullable_u64() {
assert_eq!(val, Some(4950));
}

// TODO:(everpcoc) parse to real array
// #[tokio::test]
// async fn select_array() {
// let mut conn = prepare().await;
// let row = conn.query_row("select [1, 2, 3, 4, 5]").await.unwrap();
// assert!(row.is_some());
// let row = row.unwrap();
// let (val,): (String,) = row.try_into().unwrap();
// assert_eq!(val, "[1,2,3,4,5]");
// }
#[tokio::test]
async fn select_array() {
let conn = prepare().await;
let row = conn
.query_row("select [], [1, 2, 3, 4, 5], [10::Decimal(15,2), 1.1+2.3], [to_binary('xyz')]")
.await
.unwrap();
assert!(row.is_some());
let row = row.unwrap();
assert_eq!(
row.values().to_owned(),
vec![
Value::EmptyArray,
Value::Array(vec![
Value::Number(NumberValue::UInt8(1)),
Value::Number(NumberValue::UInt8(2)),
Value::Number(NumberValue::UInt8(3)),
Value::Number(NumberValue::UInt8(4)),
Value::Number(NumberValue::UInt8(5)),
]),
Value::Array(vec![
Value::Number(NumberValue::Decimal128(
1000,
DecimalSize {
precision: 4,
scale: 2
}
)),
Value::Number(NumberValue::Decimal128(
340,
DecimalSize {
precision: 4,
scale: 2
}
))
]),
Value::Array(vec![Value::Binary(vec![120, 121, 122])]),
]
);
}

#[tokio::test]
async fn select_map() {
let conn = prepare().await;
let row = conn
.query_row("select {}, {'k1':'v1','k2':'v2'}, {'xx':to_date('2020-01-01')}")
.await
.unwrap();
assert!(row.is_some());
let row = row.unwrap();
assert_eq!(
row.values().to_owned(),
vec![
Value::EmptyMap,
Value::Map(vec![
(
Value::String("k1".to_string()),
Value::String("v1".to_string())
),
(
Value::String("k2".to_string()),
Value::String("v2".to_string())
),
]),
Value::Map(vec![(Value::String("xx".to_string()), Value::Date(18262)),]),
]
);
}

#[tokio::test]
async fn select_tuple() {
let conn = prepare().await;
let row = conn.query_row("select (parse_json('[1,2]'), [1,2], true), (st_geometryfromwkt('SRID=4126;POINT(3.0 5.0)'), to_timestamp('2024-10-22 10:11:12'))").await.unwrap();
assert!(row.is_some());
let row = row.unwrap();
assert_eq!(
row.values().to_owned(),
vec![
Value::Tuple(vec![
Value::Variant("[1,2]".to_string()),
Value::Array(vec![
Value::Number(NumberValue::UInt8(1)),
Value::Number(NumberValue::UInt8(2)),
]),
Value::Boolean(true),
]),
Value::Tuple(vec![
Value::Geometry("SRID=4126;POINT(3 5)".to_string()),
Value::Timestamp(1729591872000000)
]),
]
);
}

#[tokio::test]
async fn select_multiple_columns() {
Expand Down
3 changes: 3 additions & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ flight-sql = ["dep:arrow-array", "dep:arrow-schema", "dep:tonic"]
[dependencies]
databend-client = { workspace = true }

memchr = "2.7.2"
chrono = { version = "0.4.35", default-features = false }
geozero = { version = "0.12.0", features = ["default", "with-wkb"] }
glob = "0.3"
itertools = "0.12"
jsonb = "0.3"
lexical-core = "0.8.5"

roaring = { version = "0.10", features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
Expand Down
33 changes: 33 additions & 0 deletions sql/src/cursor_ext/cursor_checkpoint_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Cursor;

pub trait ReadCheckPointExt {
fn checkpoint(&self) -> u64;
fn rollback(&mut self, checkpoint: u64);
}

impl<T> ReadCheckPointExt for Cursor<T>
where
T: AsRef<[u8]>,
{
fn checkpoint(&self) -> u64 {
self.position()
}

fn rollback(&mut self, checkpoint: u64) {
self.set_position(checkpoint)
}
}
171 changes: 171 additions & 0 deletions sql/src/cursor_ext/cursor_read_bytes_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use memchr::memchr;
use std::io::BufRead;
use std::io::Cursor;
use std::io::ErrorKind;
use std::io::Result;

pub trait ReadBytesExt {
fn peek(&mut self) -> Option<char>;
fn peek_byte(&mut self) -> Option<u8>;
fn ignore(&mut self, f: impl Fn(u8) -> bool) -> bool;
fn ignores(&mut self, f: impl Fn(u8) -> bool) -> usize;
fn ignore_byte(&mut self, b: u8) -> bool;
fn ignore_bytes(&mut self, bs: &[u8]) -> bool;
fn ignore_white_spaces(&mut self) -> bool;
fn until(&mut self, delim: u8, buf: &mut Vec<u8>) -> usize;
fn keep_read(&mut self, buf: &mut Vec<u8>, f: impl Fn(u8) -> bool) -> usize;
fn must_ignore(&mut self, f: impl Fn(u8) -> bool) -> Result<()> {
if !self.ignore(f) {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Expected to ignore a byte",
));
}
Ok(())
}

fn must_ignore_byte(&mut self, b: u8) -> Result<()>;
}

impl<T> ReadBytesExt for Cursor<T>
where
T: AsRef<[u8]>,
{
fn peek(&mut self) -> Option<char> {
let buf = self.fill_buf().ok()?;
if buf.is_empty() {
None
} else {
Some(buf[0] as char)
}
}

fn peek_byte(&mut self) -> Option<u8> {
let buf = self.fill_buf().ok()?;
if buf.is_empty() {
None
} else {
Some(buf[0])
}
}

fn ignore(&mut self, f: impl Fn(u8) -> bool) -> bool {
match self.fill_buf() {
Ok(available) => {
if available.is_empty() {
false
} else if f(available[0]) {
self.consume(1);
true
} else {
false
}
}
Err(_) => false,
}
}

fn ignores(&mut self, f: impl Fn(u8) -> bool) -> usize {
match self.fill_buf() {
Ok(available) => {
if available.is_empty() {
return 0;
}
for (index, byt) in available.iter().enumerate() {
if !f(*byt) {
self.consume(index);
return index;
}
}
let len = available.len();
self.consume(len);
len
}
Err(_) => 0,
}
}

fn ignore_byte(&mut self, b: u8) -> bool {
self.ignore(|c| c == b)
}

fn ignore_bytes(&mut self, bs: &[u8]) -> bool {
match self.fill_buf() {
Ok(available) => {
let len = bs.len();
if available.len() < len {
return false;
}
let eq = available[..len].iter().zip(bs).all(|(x, y)| x == y);
if eq {
self.consume(len);
}
eq
}
Err(_) => false,
}
}

fn must_ignore_byte(&mut self, b: u8) -> Result<()> {
if !self.ignore_byte(b) {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
format!(
"Expected to have char '{}', got '{:?}' at pos {}",
b as char,
self.peek(),
self.position()
),
));
}
Ok(())
}

fn ignore_white_spaces(&mut self) -> bool {
self.ignores(|c| c.is_ascii_whitespace()) > 0
}

fn until(&mut self, delim: u8, buf: &mut Vec<u8>) -> usize {
match self.fill_buf() {
Ok(remaining_slice) => {
let to_read = memchr(delim, remaining_slice).map_or(buf.len(), |n| n + 1);
buf.extend_from_slice(&remaining_slice[..to_read]);
self.consume(to_read);
to_read
}
Err(_) => 0,
}
}

fn keep_read(&mut self, buf: &mut Vec<u8>, f: impl Fn(u8) -> bool) -> usize {
match self.fill_buf() {
Ok(remaining_slice) => {
let mut to_read = remaining_slice.len();
for (i, b) in remaining_slice.iter().enumerate() {
if !f(*b) {
to_read = i;
break;
}
}
buf.extend_from_slice(&remaining_slice[..to_read]);
self.consume(to_read);
to_read
}
Err(_) => 0,
}
}
}
Loading

0 comments on commit 062a189

Please sign in to comment.