From 04696e46c241e7719ec72efd2bd9fa3b0bd7e4e8 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 4 Dec 2024 13:30:16 +0800 Subject: [PATCH] feat(bindings/nodejs): support value opt variant_as_object (#518) --- bindings/nodejs/Cargo.toml | 1 + bindings/nodejs/README.md | 28 ++- bindings/nodejs/generated.js | 5 +- bindings/nodejs/index.d.ts | 17 +- bindings/nodejs/src/lib.rs | 393 +++++++++++++++++-------------- bindings/nodejs/tests/binding.js | 37 ++- driver/src/conn.rs | 34 +-- sql/src/rows.rs | 104 ++++---- sql/src/schema.rs | 2 +- 9 files changed, 361 insertions(+), 260 deletions(-) diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index 9740ee2d..d7c44968 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -25,6 +25,7 @@ napi = { version = "2.16", default-features = false, features = [ ] } napi-derive = "2.16" once_cell = "1.18" +serde_json = "1.0" [build-dependencies] napi-build = "2" diff --git a/bindings/nodejs/README.md b/bindings/nodejs/README.md index dfe56f23..7c1121c2 100644 --- a/bindings/nodejs/README.md +++ b/bindings/nodejs/README.md @@ -37,7 +37,7 @@ while (row) { } // get rows of map -const rows = await conn.queryIterMap("SELECT * FROM test"); +const rows = await conn.queryIter("SELECT * FROM test"); let row = await rows.next(); while (row) { console.log(row.data()); @@ -68,15 +68,15 @@ while (row) { ### Semi-Structured Data Types -| Databend | Node.js | -| ----------- | -------- | -| `ARRAY` | `Array` | -| `TUPLE` | `Array` | -| `MAP` | `Object` | -| `VARIANT` | `String` | -| `BITMAP` | `String` | -| `GEOMETRY` | `String` | -| `GEOGRAPHY` | `String` | +| Databend | Node.js | +| ----------- | ----------------- | +| `ARRAY` | `Array` | +| `TUPLE` | `Array` | +| `MAP` | `Object` | +| `VARIANT` | `String / Object` | +| `BITMAP` | `String` | +| `GEOMETRY` | `String` | +| `GEOGRAPHY` | `String` | Note: `VARIANT` is a json encoded string. Example: @@ -94,6 +94,14 @@ const value = JSON.parse(data); console.log(value); ``` +We also provide a helper function to convert `VARIANT` to `Object`: + +```javascript +const row = await conn.queryRow("SELECT * FROM example limit 1;"); +row.setOpts({ variantAsObject: true }); +console.log(row.data()); +``` + ## Development ```shell diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js index 98f59e2e..c04c92f3 100644 --- a/bindings/nodejs/generated.js +++ b/bindings/nodejs/generated.js @@ -268,17 +268,16 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Client, Connection, ConnectionInfo, Schema, Field, RowIterator, NamedRowIterator, RowIteratorExt, RowOrStats, Row, NamedRow, ServerStats } = nativeBinding +const { ValueOptions, Client, Connection, ConnectionInfo, Schema, Field, RowIterator, RowIteratorExt, RowOrStats, Row, ServerStats } = nativeBinding +module.exports.ValueOptions = ValueOptions module.exports.Client = Client module.exports.Connection = Connection module.exports.ConnectionInfo = ConnectionInfo module.exports.Schema = Schema module.exports.Field = Field module.exports.RowIterator = RowIterator -module.exports.NamedRowIterator = NamedRowIterator module.exports.RowIteratorExt = RowIteratorExt module.exports.RowOrStats = RowOrStats module.exports.Row = Row -module.exports.NamedRow = NamedRow module.exports.ServerStats = ServerStats diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts index ef48c421..44e98097 100644 --- a/bindings/nodejs/index.d.ts +++ b/bindings/nodejs/index.d.ts @@ -19,9 +19,12 @@ /* auto-generated by NAPI-RS */ +export class ValueOptions { + variantAsObject: boolean +} export class Client { /** Create a new databend client with a given DSN. */ - constructor(dsn: string) + constructor(dsn: string, opts?: ValueOptions | undefined | null) /** Get a connection from the client. */ getConn(): Promise } @@ -38,8 +41,6 @@ export class Connection { queryAll(sql: string): Promise> /** Execute a SQL query, and return all rows. */ queryIter(sql: string): Promise - /** Execute a SQL query, and return all rows keyed by column name. */ - queryIterMap(sql: string): Promise /** Execute a SQL query, and return all rows with schema and stats. */ queryIterExt(sql: string): Promise /** @@ -72,13 +73,6 @@ export class RowIterator { /** Get Schema for rows. */ schema(): Schema } -export class NamedRowIterator { - /** - * Fetch next row. - * Returns `None` if there are no more rows. - */ - next(): Promise -} export class RowIteratorExt { /** * Fetch next row or stats. @@ -93,9 +87,8 @@ export class RowOrStats { get stats(): ServerStats | null } export class Row { + setOpts(opts: ValueOptions): void values(): Array -} -export class NamedRow { data(): Record } export class ServerStats { diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index d7ef4c74..85b2b3eb 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -28,10 +28,141 @@ static VERSION: Lazy = Lazy::new(|| { }); #[napi] -pub struct Client(databend_driver::Client); +#[derive(Clone, Debug, Default)] +pub struct ValueOptions { + pub variant_as_object: bool, +} + +#[napi] +impl FromNapiValue for ValueOptions { + unsafe fn from_napi_value(env: sys::napi_env, val: sys::napi_value) -> Result { + let mut opts = ValueOptions::default(); + let obj = Object::from_napi_value(env, val)?; + if let Some(val) = obj.get("variantAsObject")? { + opts.variant_as_object = val; + } + Ok(opts) + } +} + +#[napi] +pub struct Client { + inner: databend_driver::Client, + opts: ValueOptions, +} + +#[napi] +impl Client { + /// Create a new databend client with a given DSN. + #[napi(constructor)] + pub fn new(dsn: String, opts: Option) -> Self { + let name = format!("databend-driver-nodejs/{}", VERSION.as_str()); + let client = databend_driver::Client::new(dsn).with_name(name); + Self { + inner: client, + opts: opts.unwrap_or_default(), + } + } + + /// Get a connection from the client. + #[napi] + pub async fn get_conn(&self) -> Result { + let conn = self.inner.get_conn().await.map_err(format_napi_error)?; + Ok(Connection::new(conn, self.opts.clone())) + } +} #[napi] -pub struct Connection(Box); +pub struct Connection { + inner: Box, + opts: ValueOptions, +} + +impl Connection { + pub fn new(inner: Box, opts: ValueOptions) -> Self { + Self { inner, opts } + } +} + +#[napi] +impl Connection { + /// Get the connection information. + #[napi] + pub async fn info(&self) -> ConnectionInfo { + ConnectionInfo(self.inner.info().await) + } + + /// Get the databend version. + #[napi] + pub async fn version(&self) -> Result { + self.inner.version().await.map_err(format_napi_error) + } + + /// Execute a SQL query, return the number of affected rows. + #[napi] + pub async fn exec(&self, sql: String) -> Result { + self.inner.exec(&sql).await.map_err(format_napi_error) + } + + /// Execute a SQL query, and only return the first row. + #[napi] + pub async fn query_row(&self, sql: String) -> Result> { + let ret = self + .inner + .query_row(&sql) + .await + .map_err(format_napi_error)?; + let row = ret.map(|r| Row::new(r, self.opts.clone())); + Ok(row) + } + + /// Execute a SQL query and fetch all data into the result + #[napi] + pub async fn query_all(&self, sql: String) -> Result> { + Ok(self + .inner + .query_all(&sql) + .await + .map_err(format_napi_error)? + .into_iter() + .map(|r| Row::new(r, self.opts.clone())) + .collect()) + } + + /// Execute a SQL query, and return all rows. + #[napi] + pub async fn query_iter(&self, sql: String) -> Result { + let iterator = self + .inner + .query_iter(&sql) + .await + .map_err(format_napi_error)?; + Ok(RowIterator::new(iterator, self.opts.clone())) + } + + /// Execute a SQL query, and return all rows with schema and stats. + #[napi] + pub async fn query_iter_ext(&self, sql: String) -> Result { + let iterator = self + .inner + .query_iter_ext(&sql) + .await + .map_err(format_napi_error)?; + Ok(RowIteratorExt::new(iterator, self.opts.clone())) + } + + /// Load data with stage attachment. + /// The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`. + #[napi] + pub async fn stream_load(&self, sql: String, data: Vec>) -> Result { + let ss = self + .inner + .stream_load(&sql, data) + .await + .map_err(format_napi_error)?; + Ok(ServerStats(ss)) + } +} #[napi] pub struct ConnectionInfo(databend_driver::ConnectionInfo); @@ -69,12 +200,21 @@ impl ConnectionInfo { } } -pub struct Value(databend_driver::Value); +pub struct Value<'v> { + inner: &'v databend_driver::Value, + opts: &'v ValueOptions, +} -impl ToNapiValue for Value { +impl<'v> Value<'v> { + pub fn new(inner: &'v databend_driver::Value, opts: &'v ValueOptions) -> Self { + Self { inner, opts } + } +} + +impl<'v> ToNapiValue for Value<'v> { unsafe fn to_napi_value(env: sys::napi_env, val: Self) -> Result { let ctx = Env::from(env); - match val.0 { + match val.inner { databend_driver::Value::Null => Null::to_napi_value(env, Null), databend_driver::Value::EmptyArray => { let arr = ctx.create_array(0)?; @@ -84,16 +224,22 @@ impl ToNapiValue for Value { let obj = ctx.create_object()?; Object::to_napi_value(env, obj) } - databend_driver::Value::Boolean(b) => bool::to_napi_value(env, b), - databend_driver::Value::Binary(b) => Buffer::to_napi_value(env, b.into()), - databend_driver::Value::String(s) => String::to_napi_value(env, s), - databend_driver::Value::Number(n) => NumberValue::to_napi_value(env, NumberValue(n)), + databend_driver::Value::Boolean(b) => bool::to_napi_value(env, *b), + databend_driver::Value::Binary(b) => { + Buffer::to_napi_value(env, Buffer::from(b.as_slice())) + } + databend_driver::Value::String(s) => String::to_napi_value(env, s.to_string()), + databend_driver::Value::Number(n) => { + NumberValue::to_napi_value(env, NumberValue(n.clone())) + } databend_driver::Value::Timestamp(_) => { - let v = NaiveDateTime::try_from(val.0).map_err(format_napi_error)?; + let inner = val.inner.clone(); + let v = NaiveDateTime::try_from(inner).map_err(format_napi_error)?; NaiveDateTime::to_napi_value(env, v) } databend_driver::Value::Date(_) => { - let v = NaiveDate::try_from(val.0).map_err(format_napi_error)?; + let inner = val.inner.clone(); + let v = NaiveDate::try_from(inner).map_err(format_napi_error)?; NaiveDateTime::to_napi_value( env, NaiveDateTime::new(v, NaiveTime::from_hms_opt(0, 0, 0).unwrap()), @@ -102,28 +248,36 @@ impl ToNapiValue for Value { databend_driver::Value::Array(inner) => { let mut arr = ctx.create_array(inner.len() as u32)?; for (i, v) in inner.into_iter().enumerate() { - arr.set(i as u32, Value(v))?; + arr.set(i as u32, Value::new(v, val.opts))?; } Array::to_napi_value(env, arr) } databend_driver::Value::Map(inner) => { let mut obj = ctx.create_object()?; for (k, v) in inner.into_iter() { - obj.set(k.to_string(), Value(v))?; + obj.set(k.to_string(), Value::new(v, val.opts))?; } Object::to_napi_value(env, obj) } databend_driver::Value::Tuple(inner) => { let mut arr = ctx.create_array(inner.len() as u32)?; for (i, v) in inner.into_iter().enumerate() { - arr.set(i as u32, Value(v))?; + arr.set(i as u32, Value::new(v, val.opts))?; } Array::to_napi_value(env, arr) } - databend_driver::Value::Bitmap(s) => String::to_napi_value(env, s), - databend_driver::Value::Variant(s) => String::to_napi_value(env, s), - databend_driver::Value::Geometry(s) => String::to_napi_value(env, s), - databend_driver::Value::Geography(s) => String::to_napi_value(env, s), + databend_driver::Value::Bitmap(s) => String::to_napi_value(env, s.to_string()), + databend_driver::Value::Variant(s) => { + if val.opts.variant_as_object { + let val: serde_json::Value = serde_json::from_str(s) + .map_err(|e| Error::from_reason(format!("parse variant error: {}", e)))?; + serde_json::Value::to_napi_value(env, val) + } else { + String::to_napi_value(env, s.to_string()) + } + } + databend_driver::Value::Geometry(s) => String::to_napi_value(env, s.to_string()), + databend_driver::Value::Geography(s) => String::to_napi_value(env, s.to_string()), } } } @@ -181,7 +335,16 @@ impl Field { } #[napi] -pub struct RowIterator(databend_driver::RowIterator); +pub struct RowIterator { + inner: databend_driver::RowIterator, + opts: ValueOptions, +} + +impl RowIterator { + pub fn new(inner: databend_driver::RowIterator, opts: ValueOptions) -> Self { + Self { inner, opts } + } +} #[napi] impl RowIterator { @@ -190,41 +353,30 @@ impl RowIterator { #[napi] #[allow(clippy::missing_safety_doc)] pub async unsafe fn next(&mut self) -> Option> { - self.0 - .next() - .await - .map(|row| row.map(Row).map_err(format_napi_error)) + self.inner.next().await.map(|row| { + row.map(|r| Row::new(r, self.opts.clone())) + .map_err(format_napi_error) + }) } /// Get Schema for rows. #[napi] pub fn schema(&self) -> Schema { - Schema(self.0.schema().clone()) + Schema(self.inner.schema().clone()) } } #[napi] -pub struct NamedRowIterator(databend_driver::RowIterator); +pub struct RowIteratorExt { + inner: databend_driver::RowStatsIterator, + opts: ValueOptions, +} -#[napi] -impl NamedRowIterator { - /// Fetch next row. - /// Returns `None` if there are no more rows. - #[napi] - #[allow(clippy::missing_safety_doc)] - pub async unsafe fn next(&mut self) -> Option> { - self.0.next().await.map(|row| { - row.map(|r| NamedRow { - schema: self.0.schema().clone(), - row: r, - }) - .map_err(format_napi_error) - }) +impl RowIteratorExt { + pub fn new(inner: databend_driver::RowStatsIterator, opts: ValueOptions) -> Self { + Self { inner, opts } } } -#[napi] -pub struct RowIteratorExt(databend_driver::RowStatsIterator); - #[napi] impl RowIteratorExt { /// Fetch next row or stats. @@ -232,12 +384,12 @@ impl RowIteratorExt { #[napi] #[allow(clippy::missing_safety_doc)] pub async unsafe fn next(&mut self) -> Option> { - match self.0.next().await { + match self.inner.next().await { None => None, Some(r0) => match r0 { Ok(r1) => match r1 { - databend_driver::RowWithStats::Row(row) => Some(Ok(RowOrStats { - row: Some(Row(row)), + databend_driver::RowWithStats::Row(r) => Some(Ok(RowOrStats { + row: Some(Row::new(r, self.opts.clone())), stats: None, })), databend_driver::RowWithStats::Stats(ss) => Some(Ok(RowOrStats { @@ -252,7 +404,7 @@ impl RowIteratorExt { #[napi] pub fn schema(&self) -> Schema { - Schema(self.0.schema().clone()) + Schema(self.inner.schema().clone()) } } @@ -278,37 +430,44 @@ impl RowOrStats { #[napi] #[derive(Clone)] -pub struct Row(databend_driver::Row); +pub struct Row { + inner: databend_driver::Row, + opts: ValueOptions, +} -#[napi] impl Row { - #[napi] - pub fn values(&self) -> Vec { - // FIXME: do not clone - self.0.values().iter().map(|v| Value(v.clone())).collect() + pub fn new(inner: databend_driver::Row, opts: ValueOptions) -> Self { + Self { inner, opts } } } #[napi] -#[derive(Clone)] -pub struct NamedRow { - schema: databend_driver::SchemaRef, - row: databend_driver::Row, -} +impl Row { + #[napi] + pub fn set_opts(&mut self, opts: ValueOptions) { + self.opts = opts; + } + + #[napi] + pub fn values(&self) -> Vec { + self.inner + .values() + .iter() + .map(|v| Value::new(v, &self.opts)) + .collect() + } -#[napi] -impl NamedRow { #[napi] pub fn data(&self) -> HashMap { let mut map = HashMap::new(); - for (name, value) in self - .schema + let schema = self.inner.schema(); + for (name, value) in schema .fields() .iter() .map(|f| f.name.to_string()) - .zip(self.row.values().iter()) + .zip(self.inner.values().iter()) { - map.insert(name.clone(), Value(value.clone())); + map.insert(name.clone(), Value::new(value, &self.opts)); } map } @@ -356,114 +515,6 @@ impl ServerStats { } } -#[napi] -impl Client { - /// Create a new databend client with a given DSN. - #[napi(constructor)] - pub fn new(dsn: String) -> Self { - let name = format!("databend-driver-nodejs/{}", VERSION.as_str()); - let client = databend_driver::Client::new(dsn).with_name(name); - Self(client) - } - - /// Get a connection from the client. - #[napi] - pub async fn get_conn(&self) -> Result { - self.0 - .get_conn() - .await - .map(Connection) - .map_err(format_napi_error) - } -} - -#[napi] -impl Connection { - /// Get the connection information. - #[napi] - pub async fn info(&self) -> ConnectionInfo { - ConnectionInfo(self.0.info().await) - } - - /// Get the databend version. - #[napi] - pub async fn version(&self) -> Result { - self.0.version().await.map_err(format_napi_error) - } - - /// Execute a SQL query, return the number of affected rows. - #[napi] - pub async fn exec(&self, sql: String) -> Result { - self.0.exec(&sql).await.map_err(format_napi_error) - } - - /// Execute a SQL query, and only return the first row. - #[napi] - pub async fn query_row(&self, sql: String) -> Result> { - self.0 - .query_row(&sql) - .await - .map(|row| row.map(Row)) - .map_err(format_napi_error) - } - - /// Execute a SQL query and fetch all data into the result - #[napi] - pub async fn query_all(&self, sql: String) -> Result> { - Ok(self - .0 - .query_all(&sql) - .await - .map_err(format_napi_error)? - .into_iter() - .map(Row) - .collect()) - } - - /// Execute a SQL query, and return all rows. - #[napi] - pub async fn query_iter(&self, sql: String) -> Result { - self.0 - .query_iter(&sql) - .await - .map(RowIterator) - .map_err(format_napi_error) - } - - /// Execute a SQL query, and return all rows keyed by column name. - #[napi] - pub async fn query_iter_map(&self, sql: String) -> Result { - self.0 - .query_iter(&sql) - .await - .map(NamedRowIterator) - .map_err(format_napi_error) - } - - /// Execute a SQL query, and return all rows with schema and stats. - #[napi] - pub async fn query_iter_ext(&self, sql: String) -> Result { - let iterator = self - .0 - .query_iter_ext(&sql) - .await - .map_err(format_napi_error)?; - Ok(RowIteratorExt(iterator)) - } - - /// Load data with stage attachment. - /// The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`. - #[napi] - pub async fn stream_load(&self, sql: String, data: Vec>) -> Result { - let ss = self - .0 - .stream_load(&sql, data) - .await - .map_err(format_napi_error)?; - Ok(ServerStats(ss)) - } -} - fn format_napi_error(err: databend_driver::Error) -> Error { Error::from_reason(format!("{}", err)) } diff --git a/bindings/nodejs/tests/binding.js b/bindings/nodejs/tests/binding.js index 8dfd79bc..d057cb25 100644 --- a/bindings/nodejs/tests/binding.js +++ b/bindings/nodejs/tests/binding.js @@ -24,7 +24,11 @@ const dsn = process.env.TEST_DATABEND_DSN Given("A new Databend Driver Client", async function () { this.client = new Client(dsn); - this.conn = await this.client.getConn(); + const conn = await this.client.getConn(); + if (!conn) { + assert.fail("No connection returned"); + } + this.conn = conn; }); Then("Select string {string} should be equal to {string}", async function (input, output) { @@ -63,6 +67,33 @@ Then("Select types should be expected native types", async function () { const row = await this.conn.queryRow(`SELECT (10, '20', to_datetime('2024-04-16 12:34:56.789'))`); assert.deepEqual(row.values(), [[10, "20", new Date("2024-04-16T12:34:56.789Z")]]); } + + // Variant as String + { + const value = + '{"customer_id": 123, "order_id": 1001, "items": [{"name": "Shoes", "price": 59.99}, {"name": "T-shirt", "price": 19.99}]}'; + const row = await this.conn.queryRow(`SELECT parse_json('${value}')`); + assert.deepEqual( + row.values()[0], + '{"customer_id":123,"items":[{"name":"Shoes","price":59.99},{"name":"T-shirt","price":19.99}],"order_id":1001}', + ); + } + + // Variant as Object + { + const value = + '{"customer_id": 123, "order_id": 1001, "items": [{"name": "Shoes", "price": 59.99}, {"name": "T-shirt", "price": 19.99}]}'; + const row = await this.conn.queryRow(`SELECT parse_json('${value}')`); + row.setOpts({ variantAsObject: true }); + assert.deepEqual(row.values()[0], { + customer_id: 123, + order_id: 1001, + items: [ + { name: "Shoes", price: 59.99 }, + { name: "T-shirt", price: 19.99 }, + ], + }); + } }); Then("Select numbers should iterate all rows", async function () { @@ -79,9 +110,9 @@ Then("Select numbers should iterate all rows", async function () { assert.deepEqual(ret, expected); } - // iter names + // iter return with field names { - let rows = await this.conn.queryIterMap("SELECT number as n FROM numbers(5)"); + let rows = await this.conn.queryIter("SELECT number as n FROM numbers(5)"); let ret = []; let row = await rows.next(); while (row) { diff --git a/driver/src/conn.rs b/driver/src/conn.rs index 93283d4b..1a97faf2 100644 --- a/driver/src/conn.rs +++ b/driver/src/conn.rs @@ -150,6 +150,7 @@ pub trait Connection: DynClone + Send + Sync { validate_local_scheme(local_dsn.scheme())?; let mut results = Vec::new(); let stage_location = StageLocation::try_from(stage)?; + let schema = Arc::new(put_get_schema()); for entry in glob::glob(local_dsn.path())? { let entry = entry?; let filename = entry @@ -183,15 +184,17 @@ pub trait Connection: DynClone + Send + Sync { running_time_ms: 0.0, }; results.push(Ok(RowWithStats::Stats(ss))); - results.push(Ok(RowWithStats::Row(Row::from_vec(vec![ - Value::String(fname), - Value::String(status), - Value::Number(NumberValue::UInt64(size)), - ])))); + results.push(Ok(RowWithStats::Row(Row::from_vec( + schema.clone(), + vec![ + Value::String(fname), + Value::String(status), + Value::Number(NumberValue::UInt64(size)), + ], + )))); } - let schema = put_get_schema(); Ok(RowStatsIterator::new( - Arc::new(schema), + schema, Box::pin(tokio_stream::iter(results)), )) } @@ -208,6 +211,7 @@ pub trait Connection: DynClone + Send + Sync { let list_sql = format!("LIST {}", location); let mut response = self.query_iter(&list_sql).await?; let mut results = Vec::new(); + let schema = Arc::new(put_get_schema()); while let Some(row) = response.next().await { let (mut name, _, _, _, _): (String, u64, Option, String, Option) = row?.try_into().map_err(Error::Parsing)?; @@ -236,15 +240,17 @@ pub trait Connection: DynClone + Send + Sync { running_time_ms: 0.0, }; results.push(Ok(RowWithStats::Stats(ss))); - results.push(Ok(RowWithStats::Row(Row::from_vec(vec![ - Value::String(local_file.to_string_lossy().to_string()), - Value::String(status), - Value::Number(NumberValue::UInt64(size)), - ])))); + results.push(Ok(RowWithStats::Row(Row::from_vec( + schema.clone(), + vec![ + Value::String(local_file.to_string_lossy().to_string()), + Value::String(status), + Value::Number(NumberValue::UInt64(size)), + ], + )))); } - let schema = put_get_schema(); Ok(RowStatsIterator::new( - Arc::new(schema), + schema, Box::pin(tokio_stream::iter(results)), )) } diff --git a/sql/src/rows.rs b/sql/src/rows.rs index 8ded6e0d..9b6fa3eb 100644 --- a/sql/src/rows.rs +++ b/sql/src/rows.rs @@ -84,36 +84,47 @@ impl From for ServerStats { } #[derive(Clone, Debug, Default)] -pub struct Row(Vec); - -impl TryFrom<(SchemaRef, Vec>)> for Row { - type Error = Error; - - fn try_from((schema, data): (SchemaRef, Vec>)) -> Result { - let mut values: Vec = Vec::new(); - for (i, field) in schema.fields().iter().enumerate() { - let val: Option<&str> = data.get(i).and_then(|v| v.as_deref()); - values.push(Value::try_from((&field.data_type, val))?); - } - Ok(Self(values)) - } +pub struct Row { + schema: SchemaRef, + values: Vec, } impl Row { + pub fn new(schema: SchemaRef, values: Vec) -> Self { + Self { schema, values } + } + pub fn len(&self) -> usize { - self.0.len() + self.values.len() } pub fn is_empty(&self) -> bool { - self.0.is_empty() + self.values.is_empty() } pub fn values(&self) -> &[Value] { - &self.0 + &self.values } - pub fn from_vec(values: Vec) -> Self { - Self(values) + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + pub fn from_vec(schema: SchemaRef, values: Vec) -> Self { + Self { schema, values } + } +} + +impl TryFrom<(SchemaRef, Vec>)> for Row { + type Error = Error; + + fn try_from((schema, data): (SchemaRef, Vec>)) -> Result { + let mut values: Vec = Vec::new(); + for (i, field) in schema.fields().iter().enumerate() { + let val: Option<&str> = data.get(i).and_then(|v| v.as_deref()); + values.push(Value::try_from((&field.data_type, val))?); + } + Ok(Self::new(schema, values)) } } @@ -122,36 +133,55 @@ impl IntoIterator for Row { type IntoIter = std::vec::IntoIter; fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() + self.values.into_iter() } } #[derive(Clone, Debug)] pub struct Rows { - schema: SchemaRef, rows: Vec, } +impl Rows { + pub fn new(rows: Vec) -> Self { + Self { rows } + } + + // pub fn schema(&self) -> SchemaRef { + // self.schema.clone() + // } + + pub fn rows(&self) -> &[Row] { + &self.rows + } + + pub fn len(&self) -> usize { + self.rows.len() + } + + pub fn is_empty(&self) -> bool { + self.rows.is_empty() + } +} + #[cfg(feature = "flight-sql")] impl TryFrom for Rows { type Error = Error; fn try_from(batch: RecordBatch) -> Result { - let schema = batch.schema(); + let batch_schema = batch.schema(); + let schema = SchemaRef::new(batch_schema.clone().try_into()?); let mut rows: Vec = Vec::new(); for i in 0..batch.num_rows() { let mut values: Vec = Vec::new(); - for j in 0..schema.fields().len() { + for j in 0..batch_schema.fields().len() { let v = batch.column(j); - let field = schema.field(j); + let field = batch_schema.field(j); let value = Value::try_from((field, v, i))?; values.push(value); } - rows.push(Row(values)); + rows.push(Row::new(schema.clone(), values)); } - Ok(Self { - schema: std::sync::Arc::new(schema.try_into()?), - rows, - }) + Ok(Self::new(rows)) } } @@ -164,24 +194,6 @@ impl IntoIterator for Rows { } } -impl Rows { - pub fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - pub fn rows(&self) -> &[Row] { - &self.rows - } - - pub fn len(&self) -> usize { - self.rows.len() - } - - pub fn is_empty(&self) -> bool { - self.rows.is_empty() - } -} - macro_rules! replace_expr { ($_t:tt $sub:expr) => { $sub diff --git a/sql/src/schema.rs b/sql/src/schema.rs index 8c408faf..8e954b64 100644 --- a/sql/src/schema.rs +++ b/sql/src/schema.rs @@ -162,7 +162,7 @@ pub struct Field { pub data_type: DataType, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Schema(Vec); pub type SchemaRef = Arc;