From ab719f1f6c155fa1576597146ad2001fd0963506 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Mon, 20 Nov 2023 14:25:50 +0800 Subject: [PATCH] feat(bindings/python): add blocking client --- bindings/python/README.md | 65 +++- bindings/python/src/asyncio.rs | 106 ++++++ bindings/python/src/blocking.rs | 114 ++++++ bindings/python/src/lib.rs | 328 +----------------- bindings/python/src/types.rs | 284 +++++++++++++++ bindings/python/tests/asyncio/binding.feature | 1 + .../tests/{ => asyncio}/steps/binding.py | 0 bindings/python/tests/binding.feature | 1 - .../python/tests/blocking/binding.feature | 1 + .../python/tests/blocking/steps/binding.py | 108 ++++++ tests/Makefile | 2 +- 11 files changed, 693 insertions(+), 317 deletions(-) create mode 100644 bindings/python/src/asyncio.rs create mode 100644 bindings/python/src/blocking.rs create mode 100644 bindings/python/src/types.rs create mode 120000 bindings/python/tests/asyncio/binding.feature rename bindings/python/tests/{ => asyncio}/steps/binding.py (100%) delete mode 120000 bindings/python/tests/binding.feature create mode 120000 bindings/python/tests/blocking/binding.feature create mode 100644 bindings/python/tests/blocking/steps/binding.py diff --git a/bindings/python/README.md b/bindings/python/README.md index a170329f4..fedb4764d 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -9,6 +9,34 @@ maturin develop ## Usage + +### Blocking + +```python +from databend_driver import BlockingDatabendClient + +client = BlockingDatabendClient('databend+http://root:root@localhost:8000/?sslmode=disable') +conn = client.get_conn() +conn.exec( + """ + CREATE TABLE test ( + i64 Int64, + u64 UInt64, + f64 Float64, + s String, + s2 String, + d Date, + t DateTime + ) + """ +) +rows = conn.query_iter("SELECT * FROM test") +for row in rows: + print(row.values()) +``` + +### Asyncio + ```python import asyncio from databend_driver import AsyncDatabendClient @@ -58,6 +86,27 @@ class AsyncDatabendConnection: async def stream_load(self, sql: str, data: list[list[str]]) -> ServerStats: ... ``` +### BlockingDatabendClient + +```python +class BlockingDatabendClient: + def __init__(self, dsn: str): ... + def get_conn(self) -> BlockingDatabendConnection: ... +``` + +### BlockingDatabendConnection + +```python +class BlockingDatabendConnection: + def info(self) -> ConnectionInfo: ... + def version(self) -> str: ... + def exec(self, sql: str) -> int: ... + def query_row(self, sql: str) -> Row: ... + def query_iter(self, sql: str) -> RowIterator: ... + def stream_load(self, sql: str, data: list[list[str]]) -> ServerStats: ... +``` + + ### Row ```python @@ -69,9 +118,13 @@ class Row: ```python class RowIterator: + def schema(self) -> Schema: ... + + def __iter__(self) -> RowIterator: ... + def __next__(self) -> Row: ... + def __aiter__(self) -> RowIterator: ... async def __anext__(self) -> Row: ... - def schema(self) -> Schema: ... ``` ### Field @@ -131,8 +184,14 @@ class ConnectionInfo: ## Development +``` +cd tests +make up +``` + ```shell +cd bindings/python pipenv install --dev -maturin develop -pipenv run behave tests +pipenv run maturin develop +pipenv run behave tests/* ``` diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs new file mode 100644 index 000000000..f2d9b863c --- /dev/null +++ b/bindings/python/src/asyncio.rs @@ -0,0 +1,106 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pyo3::exceptions::PyException; +use pyo3::prelude::*; +use pyo3_asyncio::tokio::future_into_py; + +use crate::types::{ConnectionInfo, Row, RowIterator, ServerStats}; + +#[pyclass(module = "databend_driver")] +pub struct AsyncDatabendClient(databend_driver::Client); + +#[pymethods] +impl AsyncDatabendClient { + #[new] + #[pyo3(signature = (dsn))] + pub fn new(dsn: String) -> PyResult { + let client = databend_driver::Client::new(dsn); + Ok(Self(client)) + } + + pub fn get_conn<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let conn = this.get_conn().await.unwrap(); + Ok(AsyncDatabendConnection(conn)) + }) + } +} + +#[pyclass(module = "databend_driver")] +pub struct AsyncDatabendConnection(Box); + +#[pymethods] +impl AsyncDatabendConnection { + pub fn info<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let info = this.info().await; + Ok(ConnectionInfo::new(info)) + }) + } + + pub fn version<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let version = this.version().await.unwrap(); + Ok(version) + }) + } + + pub fn exec<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let res = this.exec(&sql).await.unwrap(); + Ok(res) + }) + } + + pub fn query_row<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let row = this.query_row(&sql).await.unwrap(); + Ok(Row::new(row.unwrap())) + }) + } + + pub fn query_iter<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let streamer = this.query_iter(&sql).await.unwrap(); + Ok(RowIterator::new(streamer)) + }) + } + + pub fn stream_load<'p>( + &self, + py: Python<'p>, + sql: String, + data: Vec>, + ) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + future_into_py(py, async move { + let data = data + .iter() + .map(|v| v.iter().map(|s| s.as_ref()).collect()) + .collect(); + let ss = this + .stream_load(&sql, data) + .await + .map_err(|e| PyException::new_err(format!("{}", e)))?; + Ok(ServerStats::new(ss)) + }) + } +} diff --git a/bindings/python/src/blocking.rs b/bindings/python/src/blocking.rs new file mode 100644 index 000000000..ec9f95011 --- /dev/null +++ b/bindings/python/src/blocking.rs @@ -0,0 +1,114 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pyo3::prelude::*; + +use crate::types::{ConnectionInfo, Row, RowIterator, ServerStats}; + +#[pyclass(module = "databend_driver")] +pub struct BlockingDatabendClient(databend_driver::Client); + +#[pymethods] +impl BlockingDatabendClient { + #[new] + #[pyo3(signature = (dsn))] + pub fn new(dsn: String) -> PyResult { + let client = databend_driver::Client::new(dsn); + Ok(Self(client)) + } + + pub fn get_conn(&self) -> PyResult { + let this = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let conn = this.get_conn().await.unwrap(); + conn + }); + Ok(BlockingDatabendConnection(ret)) + } +} + +#[pyclass(module = "databend_driver")] +pub struct BlockingDatabendConnection(Box); + +#[pymethods] +impl BlockingDatabendConnection { + pub fn info(&self) -> PyResult { + let this = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let info = this.info().await; + info + }); + Ok(ConnectionInfo::new(ret)) + } + + pub fn version(&self) -> PyResult { + let this = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let version = this.version().await.unwrap(); + version + }); + Ok(ret) + } + + pub fn exec(&self, sql: String) -> PyResult { + let this = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let res = this.exec(&sql).await.unwrap(); + res + }); + Ok(ret) + } + + pub fn query_row(&self, sql: String) -> PyResult> { + let this = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let row = this.query_row(&sql).await.unwrap(); + row + }); + Ok(ret.map(Row::new)) + } + + pub fn query_iter(&self, sql: String) -> PyResult { + let this = self.0.clone(); + // future_into_py(py, async move { + // let streamer = this.query_iter(&sql).await.unwrap(); + // Ok(RowIterator::new(streamer)) + // }) + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let streamer = this.query_iter(&sql).await.unwrap(); + streamer + }); + Ok(RowIterator::new(ret)) + } + + pub fn stream_load(&self, sql: String, data: Vec>) -> PyResult { + let this = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let data = data + .iter() + .map(|v| v.iter().map(|s| s.as_ref()).collect()) + .collect(); + let ss = this.stream_load(&sql, data).await.unwrap(); + ss + }); + Ok(ServerStats::new(ret)) + } +} diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 37aa95278..64331a246 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -12,331 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +mod asyncio; +mod blocking; +mod types; use pyo3::create_exception; -use pyo3::exceptions::{PyException, PyStopAsyncIteration}; +use pyo3::exceptions::PyException; use pyo3::prelude::*; -use pyo3::types::{PyDict, PyList, PyTuple}; -use pyo3_asyncio::tokio::future_into_py; -use tokio::sync::Mutex; -use tokio_stream::StreamExt; + +use crate::asyncio::{AsyncDatabendClient, AsyncDatabendConnection}; +use crate::blocking::{BlockingDatabendClient, BlockingDatabendConnection}; +use crate::types::{ConnectionInfo, Row, RowIterator, Schema, ServerStats}; create_exception!( databend_client, Error, PyException, - "databend_client related errors" + "databend_client errors" ); #[pymodule] fn _databend_driver(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } - -#[pyclass(module = "databend_driver")] -pub struct AsyncDatabendClient(databend_driver::Client); - -#[pymethods] -impl AsyncDatabendClient { - #[new] - #[pyo3(signature = (dsn))] - pub fn new(dsn: String) -> PyResult { - let client = databend_driver::Client::new(dsn); - Ok(Self(client)) - } - - pub fn get_conn<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let conn = this.get_conn().await.unwrap(); - Ok(AsyncDatabendConnection(conn)) - }) - } -} - -#[pyclass(module = "databend_driver")] -pub struct AsyncDatabendConnection(Box); - -#[pymethods] -impl AsyncDatabendConnection { - pub fn info<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let info = this.info().await; - Ok(ConnectionInfo(info)) - }) - } - - pub fn version<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let version = this.version().await.unwrap(); - Ok(version) - }) - } - - pub fn exec<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let res = this.exec(&sql).await.unwrap(); - Ok(res) - }) - } - - pub fn query_row<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let row = this.query_row(&sql).await.unwrap(); - let row = row.unwrap(); - Ok(Row(row)) - }) - } - - pub fn query_iter<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let streamer = this.query_iter(&sql).await.unwrap(); - Ok(RowIterator(Arc::new(Mutex::new(streamer)))) - }) - } - - pub fn stream_load<'p>( - &self, - py: Python<'p>, - sql: String, - data: Vec>, - ) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let data = data - .iter() - .map(|v| v.iter().map(|s| s.as_ref()).collect()) - .collect(); - let ss = this - .stream_load(&sql, data) - .await - .map_err(|e| PyException::new_err(format!("{}", e)))?; - Ok(ServerStats(ss)) - }) - } -} - -#[pyclass(module = "databend_driver")] -pub struct Row(databend_driver::Row); - -#[pymethods] -impl Row { - pub fn values<'p>(&'p self, py: Python<'p>) -> PyResult { - let res = PyTuple::new( - py, - self.0.values().iter().map(|v| Value(v.clone()).into_py(py)), // FIXME: do not clone - ); - Ok(res.into_py(py)) - } -} - -pub struct Value(databend_driver::Value); - -impl IntoPy for Value { - fn into_py(self, py: Python<'_>) -> PyObject { - match self.0 { - databend_driver::Value::Null => py.None(), - databend_driver::Value::EmptyArray => { - let list = PyList::empty(py); - list.into_py(py) - } - databend_driver::Value::EmptyMap => { - let dict = PyDict::new(py); - dict.into_py(py) - } - databend_driver::Value::Boolean(b) => b.into_py(py), - databend_driver::Value::String(s) => s.into_py(py), - databend_driver::Value::Number(n) => { - let v = NumberValue(n); - v.into_py(py) - } - databend_driver::Value::Timestamp(_) => { - let s = self.0.to_string(); - s.into_py(py) - } - databend_driver::Value::Date(_) => { - let s = self.0.to_string(); - s.into_py(py) - } - databend_driver::Value::Array(inner) => { - let list = PyList::new(py, inner.into_iter().map(|v| Value(v).into_py(py))); - list.into_py(py) - } - databend_driver::Value::Map(inner) => { - let dict = PyDict::new(py); - for (k, v) in inner { - dict.set_item(Value(k).into_py(py), Value(v).into_py(py)) - .unwrap(); - } - dict.into_py(py) - } - databend_driver::Value::Tuple(inner) => { - let tuple = PyTuple::new(py, inner.into_iter().map(|v| Value(v).into_py(py))); - tuple.into_py(py) - } - databend_driver::Value::Bitmap(s) => s.into_py(py), - databend_driver::Value::Variant(s) => s.into_py(py), - } - } -} - -pub struct NumberValue(databend_driver::NumberValue); - -impl IntoPy for NumberValue { - fn into_py(self, py: Python<'_>) -> PyObject { - match self.0 { - databend_driver::NumberValue::Int8(i) => i.into_py(py), - databend_driver::NumberValue::Int16(i) => i.into_py(py), - databend_driver::NumberValue::Int32(i) => i.into_py(py), - databend_driver::NumberValue::Int64(i) => i.into_py(py), - databend_driver::NumberValue::UInt8(i) => i.into_py(py), - databend_driver::NumberValue::UInt16(i) => i.into_py(py), - databend_driver::NumberValue::UInt32(i) => i.into_py(py), - databend_driver::NumberValue::UInt64(i) => i.into_py(py), - databend_driver::NumberValue::Float32(i) => i.into_py(py), - databend_driver::NumberValue::Float64(i) => i.into_py(py), - databend_driver::NumberValue::Decimal128(_, _) => { - let s = self.0.to_string(); - s.into_py(py) - } - databend_driver::NumberValue::Decimal256(_, _) => { - let s = self.0.to_string(); - s.into_py(py) - } - } - } -} - -#[pyclass(module = "databend_driver")] -pub struct RowIterator(Arc>); - -#[pymethods] -impl RowIterator { - fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { - slf - } - fn __anext__(&self, py: Python<'_>) -> PyResult> { - let streamer = self.0.clone(); - let future = future_into_py(py, async move { - match streamer.lock().await.next().await { - Some(val) => match val { - Err(e) => Err(PyException::new_err(format!("{}", e))), - Ok(ret) => Ok(Row(ret)), - }, - None => Err(PyStopAsyncIteration::new_err("The iterator is exhausted")), - } - }); - Ok(Some(future?.into())) - } - fn schema<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - let streamer = self.0.clone(); - future_into_py(py, async move { - let schema = streamer.lock().await.schema(); - Ok(Schema(schema)) - }) - } -} - -#[pyclass(module = "databend_driver")] -pub struct Schema(databend_driver::SchemaRef); - -#[pymethods] -impl Schema { - pub fn fields<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - let fields = self - .0 - .fields() - .into_iter() - .map(|f| Field(f.clone()).into_py(py)); - Ok(PyList::new(py, fields)) - } -} - -#[pyclass(module = "databend_driver")] -pub struct Field(databend_driver::Field); - -#[pymethods] -impl Field { - #[getter] - pub fn name(&self) -> String { - self.0.name.to_string() - } - #[getter] - pub fn data_type(&self) -> String { - self.0.data_type.to_string() - } -} - -#[pyclass(module = "databend_driver")] -pub struct ConnectionInfo(databend_driver::ConnectionInfo); - -#[pymethods] -impl ConnectionInfo { - #[getter] - pub fn handler(&self) -> String { - self.0.handler.to_string() - } - #[getter] - pub fn host(&self) -> String { - self.0.host.to_string() - } - #[getter] - pub fn port(&self) -> u16 { - self.0.port - } - #[getter] - pub fn user(&self) -> String { - self.0.user.to_string() - } - #[getter] - pub fn database(&self) -> Option { - self.0.database.clone() - } - #[getter] - pub fn warehouse(&self) -> Option { - self.0.warehouse.clone() - } -} - -#[pyclass(module = "databend_driver")] -pub struct ServerStats(databend_driver::ServerStats); - -#[pymethods] -impl ServerStats { - #[getter] - pub fn total_rows(&self) -> usize { - self.0.total_rows - } - #[getter] - pub fn total_bytes(&self) -> usize { - self.0.total_bytes - } - #[getter] - pub fn read_rows(&self) -> usize { - self.0.read_rows - } - #[getter] - pub fn read_bytes(&self) -> usize { - self.0.read_bytes - } - #[getter] - pub fn write_rows(&self) -> usize { - self.0.write_rows - } - #[getter] - pub fn write_bytes(&self) -> usize { - self.0.write_bytes - } - #[getter] - pub fn running_time_ms(&self) -> f64 { - self.0.running_time_ms - } -} diff --git a/bindings/python/src/types.rs b/bindings/python/src/types.rs new file mode 100644 index 000000000..10e98544f --- /dev/null +++ b/bindings/python/src/types.rs @@ -0,0 +1,284 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use pyo3::exceptions::{PyException, PyStopAsyncIteration, PyStopIteration}; +use pyo3::prelude::*; +use pyo3::types::{PyDict, PyList, PyTuple}; +use pyo3_asyncio::tokio::future_into_py; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; + +pub struct Value(databend_driver::Value); + +impl IntoPy for Value { + fn into_py(self, py: Python<'_>) -> PyObject { + match self.0 { + databend_driver::Value::Null => py.None(), + databend_driver::Value::EmptyArray => { + let list = PyList::empty(py); + list.into_py(py) + } + databend_driver::Value::EmptyMap => { + let dict = PyDict::new(py); + dict.into_py(py) + } + databend_driver::Value::Boolean(b) => b.into_py(py), + databend_driver::Value::String(s) => s.into_py(py), + databend_driver::Value::Number(n) => { + let v = NumberValue(n); + v.into_py(py) + } + databend_driver::Value::Timestamp(_) => { + let s = self.0.to_string(); + s.into_py(py) + } + databend_driver::Value::Date(_) => { + let s = self.0.to_string(); + s.into_py(py) + } + databend_driver::Value::Array(inner) => { + let list = PyList::new(py, inner.into_iter().map(|v| Value(v).into_py(py))); + list.into_py(py) + } + databend_driver::Value::Map(inner) => { + let dict = PyDict::new(py); + for (k, v) in inner { + dict.set_item(Value(k).into_py(py), Value(v).into_py(py)) + .unwrap(); + } + dict.into_py(py) + } + databend_driver::Value::Tuple(inner) => { + let tuple = PyTuple::new(py, inner.into_iter().map(|v| Value(v).into_py(py))); + tuple.into_py(py) + } + databend_driver::Value::Bitmap(s) => s.into_py(py), + databend_driver::Value::Variant(s) => s.into_py(py), + } + } +} + +pub struct NumberValue(databend_driver::NumberValue); + +impl IntoPy for NumberValue { + fn into_py(self, py: Python<'_>) -> PyObject { + match self.0 { + databend_driver::NumberValue::Int8(i) => i.into_py(py), + databend_driver::NumberValue::Int16(i) => i.into_py(py), + databend_driver::NumberValue::Int32(i) => i.into_py(py), + databend_driver::NumberValue::Int64(i) => i.into_py(py), + databend_driver::NumberValue::UInt8(i) => i.into_py(py), + databend_driver::NumberValue::UInt16(i) => i.into_py(py), + databend_driver::NumberValue::UInt32(i) => i.into_py(py), + databend_driver::NumberValue::UInt64(i) => i.into_py(py), + databend_driver::NumberValue::Float32(i) => i.into_py(py), + databend_driver::NumberValue::Float64(i) => i.into_py(py), + databend_driver::NumberValue::Decimal128(_, _) => { + let s = self.0.to_string(); + s.into_py(py) + } + databend_driver::NumberValue::Decimal256(_, _) => { + let s = self.0.to_string(); + s.into_py(py) + } + } + } +} + +#[pyclass(module = "databend_driver")] +pub struct Row(databend_driver::Row); + +impl Row { + pub fn new(row: databend_driver::Row) -> Self { + Row(row) + } +} + +#[pymethods] +impl Row { + pub fn values<'p>(&'p self, py: Python<'p>) -> PyResult { + let res = PyTuple::new( + py, + self.0.values().iter().map(|v| Value(v.clone()).into_py(py)), // FIXME: do not clone + ); + Ok(res.into_py(py)) + } +} + +#[pyclass(module = "databend_driver")] +pub struct RowIterator(Arc>); + +impl RowIterator { + pub fn new(streamer: databend_driver::RowIterator) -> Self { + RowIterator(Arc::new(Mutex::new(streamer))) + } +} + +#[pymethods] +impl RowIterator { + fn schema<'p>(&self) -> PyResult { + let streamer = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + let schema = streamer.lock().await.schema(); + schema + }); + Ok(Schema(ret)) + } + + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + fn __next__(&self) -> PyResult> { + let streamer = self.0.clone(); + let rt = tokio::runtime::Runtime::new()?; + let ret = rt.block_on(async move { + match streamer.lock().await.next().await { + Some(val) => match val { + Err(e) => Err(PyException::new_err(format!("{}", e))), + Ok(ret) => Ok(Row(ret)), + }, + None => Err(PyStopIteration::new_err("The iterator is exhausted")), + } + }); + ret.map(Some) + } + + fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + fn __anext__(&self, py: Python<'_>) -> PyResult> { + let streamer = self.0.clone(); + let future = future_into_py(py, async move { + match streamer.lock().await.next().await { + Some(val) => match val { + Err(e) => Err(PyException::new_err(format!("{}", e))), + Ok(ret) => Ok(Row(ret)), + }, + None => Err(PyStopAsyncIteration::new_err("The iterator is exhausted")), + } + }); + Ok(Some(future?.into())) + } +} + +#[pyclass(module = "databend_driver")] +pub struct Schema(databend_driver::SchemaRef); + +#[pymethods] +impl Schema { + pub fn fields<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let fields = self + .0 + .fields() + .into_iter() + .map(|f| Field(f.clone()).into_py(py)); + Ok(PyList::new(py, fields)) + } +} + +#[pyclass(module = "databend_driver")] +pub struct Field(databend_driver::Field); + +#[pymethods] +impl Field { + #[getter] + pub fn name(&self) -> String { + self.0.name.to_string() + } + #[getter] + pub fn data_type(&self) -> String { + self.0.data_type.to_string() + } +} + +#[pyclass(module = "databend_driver")] +pub struct ConnectionInfo(databend_driver::ConnectionInfo); + +impl ConnectionInfo { + pub fn new(info: databend_driver::ConnectionInfo) -> Self { + ConnectionInfo(info) + } +} + +#[pymethods] +impl ConnectionInfo { + #[getter] + pub fn handler(&self) -> String { + self.0.handler.to_string() + } + #[getter] + pub fn host(&self) -> String { + self.0.host.to_string() + } + #[getter] + pub fn port(&self) -> u16 { + self.0.port + } + #[getter] + pub fn user(&self) -> String { + self.0.user.to_string() + } + #[getter] + pub fn database(&self) -> Option { + self.0.database.clone() + } + #[getter] + pub fn warehouse(&self) -> Option { + self.0.warehouse.clone() + } +} + +#[pyclass(module = "databend_driver")] +pub struct ServerStats(databend_driver::ServerStats); + +impl ServerStats { + pub fn new(stats: databend_driver::ServerStats) -> Self { + ServerStats(stats) + } +} + +#[pymethods] +impl ServerStats { + #[getter] + pub fn total_rows(&self) -> usize { + self.0.total_rows + } + #[getter] + pub fn total_bytes(&self) -> usize { + self.0.total_bytes + } + #[getter] + pub fn read_rows(&self) -> usize { + self.0.read_rows + } + #[getter] + pub fn read_bytes(&self) -> usize { + self.0.read_bytes + } + #[getter] + pub fn write_rows(&self) -> usize { + self.0.write_rows + } + #[getter] + pub fn write_bytes(&self) -> usize { + self.0.write_bytes + } + #[getter] + pub fn running_time_ms(&self) -> f64 { + self.0.running_time_ms + } +} diff --git a/bindings/python/tests/asyncio/binding.feature b/bindings/python/tests/asyncio/binding.feature new file mode 120000 index 000000000..fcf9cd614 --- /dev/null +++ b/bindings/python/tests/asyncio/binding.feature @@ -0,0 +1 @@ +../../../tests/features/binding.feature \ No newline at end of file diff --git a/bindings/python/tests/steps/binding.py b/bindings/python/tests/asyncio/steps/binding.py similarity index 100% rename from bindings/python/tests/steps/binding.py rename to bindings/python/tests/asyncio/steps/binding.py diff --git a/bindings/python/tests/binding.feature b/bindings/python/tests/binding.feature deleted file mode 120000 index fcb71d389..000000000 --- a/bindings/python/tests/binding.feature +++ /dev/null @@ -1 +0,0 @@ -../../tests/features/binding.feature \ No newline at end of file diff --git a/bindings/python/tests/blocking/binding.feature b/bindings/python/tests/blocking/binding.feature new file mode 120000 index 000000000..fcf9cd614 --- /dev/null +++ b/bindings/python/tests/blocking/binding.feature @@ -0,0 +1 @@ +../../../tests/features/binding.feature \ No newline at end of file diff --git a/bindings/python/tests/blocking/steps/binding.py b/bindings/python/tests/blocking/steps/binding.py new file mode 100644 index 000000000..071d733a1 --- /dev/null +++ b/bindings/python/tests/blocking/steps/binding.py @@ -0,0 +1,108 @@ +# Copyright 2021 Datafuse Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from behave import given, when, then +import databend_driver + + +@given("A new Databend Driver Client") +def _(context): + dsn = os.getenv( + "TEST_DATABEND_DSN", "databend+http://root:root@localhost:8000/?sslmode=disable" + ) + client = databend_driver.BlockingDatabendClient(dsn) + context.conn = client.get_conn() + + +@when("Create a test table") +def _(context): + context.conn.exec("DROP TABLE IF EXISTS test") + context.conn.exec( + """ + CREATE TABLE test ( + i64 Int64, + u64 UInt64, + f64 Float64, + s String, + s2 String, + d Date, + t DateTime + ) + """ + ) + + +@then("Select string {input} should be equal to {output}") +def _(context, input, output): + row = context.conn.query_row(f"SELECT '{input}'") + value = row.values()[0] + assert output == value + + +@then("Select numbers should iterate all rows") +def _(context): + rows = context.conn.query_iter("SELECT number FROM numbers(5)") + ret = [] + for row in rows: + ret.append(row.values()[0]) + expected = [0, 1, 2, 3, 4] + assert ret == expected + + +@then("Insert and Select should be equal") +def _(context): + context.conn.exec( + """ + INSERT INTO test VALUES + (-1, 1, 1.0, '1', '1', '2011-03-06', '2011-03-06 06:20:00'), + (-2, 2, 2.0, '2', '2', '2012-05-31', '2012-05-31 11:20:00'), + (-3, 3, 3.0, '3', '2', '2016-04-04', '2016-04-04 11:30:00') + """ + ) + rows = context.conn.query_iter("SELECT * FROM test") + ret = [] + for row in rows: + ret.append(row.values()) + expected = [ + (-1, 1, 1.0, "1", "1", "2011-03-06", "2011-03-06 06:20:00"), + (-2, 2, 2.0, "2", "2", "2012-05-31", "2012-05-31 11:20:00"), + (-3, 3, 3.0, "3", "2", "2016-04-04", "2016-04-04 11:30:00"), + ] + assert ret == expected + + +@then("Stream load and Select should be equal") +def _(context): + values = [ + ["-1", "1", "1.0", "1", "1", "2011-03-06", "2011-03-06T06:20:00Z"], + ["-2", "2", "2.0", "2", "2", "2012-05-31", "2012-05-31T11:20:00Z"], + ["-3", "3", "3.0", "3", "2", "2016-04-04", "2016-04-04T11:30:00Z"], + ] + progress = context.conn.stream_load("INSERT INTO test VALUES", values) + assert progress.write_rows == 3 + assert progress.write_bytes == 185 + + rows = context.conn.query_iter("SELECT * FROM test") + ret = [] + for row in rows: + ret.append(row.values()) + expected = [ + (-1, 1, 1.0, "1", "1", "2011-03-06", "2011-03-06 06:20:00"), + (-2, 2, 2.0, "2", "2", "2012-05-31", "2012-05-31 11:20:00"), + (-3, 3, 3.0, "3", "2", "2016-04-04", "2016-04-04 11:30:00"), + ] + print("==>", ret) + assert ret == expected diff --git a/tests/Makefile b/tests/Makefile index bdb1b69c5..3b061853a 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -22,7 +22,7 @@ test-bendsql: up cd .. && ./cli/test.sh flight test-bindings-python: up - cd ../bindings/python && pipenv run behave tests + cd ../bindings/python && pipenv run behave tests/* test-bindings-nodejs: up cd ../bindings/nodejs && yarn test