Skip to content

Commit

Permalink
feat(bindings/python): add blocking client (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Nov 20, 2023
1 parent 1aa2c36 commit 7718530
Show file tree
Hide file tree
Showing 11 changed files with 693 additions and 317 deletions.
65 changes: 62 additions & 3 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,34 @@ maturin develop

## Usage


### Blocking

```python
from databend_driver import BlockingDatabendClient

client = BlockingDatabendClient('databend+http://root:root@localhost:8000/?sslmode=disable')
conn = client.get_conn()
conn.exec(
"""
CREATE TABLE test (
i64 Int64,
u64 UInt64,
f64 Float64,
s String,
s2 String,
d Date,
t DateTime
)
"""
)
rows = conn.query_iter("SELECT * FROM test")
for row in rows:
print(row.values())
```

### Asyncio

```python
import asyncio
from databend_driver import AsyncDatabendClient
Expand Down Expand Up @@ -58,6 +86,27 @@ class AsyncDatabendConnection:
async def stream_load(self, sql: str, data: list[list[str]]) -> ServerStats: ...
```

### BlockingDatabendClient

```python
class BlockingDatabendClient:
def __init__(self, dsn: str): ...
def get_conn(self) -> BlockingDatabendConnection: ...
```

### BlockingDatabendConnection

```python
class BlockingDatabendConnection:
def info(self) -> ConnectionInfo: ...
def version(self) -> str: ...
def exec(self, sql: str) -> int: ...
def query_row(self, sql: str) -> Row: ...
def query_iter(self, sql: str) -> RowIterator: ...
def stream_load(self, sql: str, data: list[list[str]]) -> ServerStats: ...
```


### Row

```python
Expand All @@ -69,9 +118,13 @@ class Row:

```python
class RowIterator:
def schema(self) -> Schema: ...

def __iter__(self) -> RowIterator: ...
def __next__(self) -> Row: ...

def __aiter__(self) -> RowIterator: ...
async def __anext__(self) -> Row: ...
def schema(self) -> Schema: ...
```

### Field
Expand Down Expand Up @@ -131,8 +184,14 @@ class ConnectionInfo:

## Development

```
cd tests
make up
```

```shell
cd bindings/python
pipenv install --dev
maturin develop
pipenv run behave tests
pipenv run maturin develop
pipenv run behave tests/*
```
106 changes: 106 additions & 0 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;

use crate::types::{ConnectionInfo, Row, RowIterator, ServerStats};

#[pyclass(module = "databend_driver")]
pub struct AsyncDatabendClient(databend_driver::Client);

#[pymethods]
impl AsyncDatabendClient {
#[new]
#[pyo3(signature = (dsn))]
pub fn new(dsn: String) -> PyResult<Self> {
let client = databend_driver::Client::new(dsn);
Ok(Self(client))
}

pub fn get_conn<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let conn = this.get_conn().await.unwrap();
Ok(AsyncDatabendConnection(conn))
})
}
}

#[pyclass(module = "databend_driver")]
pub struct AsyncDatabendConnection(Box<dyn databend_driver::Connection>);

#[pymethods]
impl AsyncDatabendConnection {
pub fn info<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let info = this.info().await;
Ok(ConnectionInfo::new(info))
})
}

pub fn version<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let version = this.version().await.unwrap();
Ok(version)
})
}

pub fn exec<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let res = this.exec(&sql).await.unwrap();
Ok(res)
})
}

pub fn query_row<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let row = this.query_row(&sql).await.unwrap();
Ok(Row::new(row.unwrap()))
})
}

pub fn query_iter<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let streamer = this.query_iter(&sql).await.unwrap();
Ok(RowIterator::new(streamer))
})
}

pub fn stream_load<'p>(
&self,
py: Python<'p>,
sql: String,
data: Vec<Vec<String>>,
) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let data = data
.iter()
.map(|v| v.iter().map(|s| s.as_ref()).collect())
.collect();
let ss = this
.stream_load(&sql, data)
.await
.map_err(|e| PyException::new_err(format!("{}", e)))?;
Ok(ServerStats::new(ss))
})
}
}
114 changes: 114 additions & 0 deletions bindings/python/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pyo3::prelude::*;

use crate::types::{ConnectionInfo, Row, RowIterator, ServerStats};

#[pyclass(module = "databend_driver")]
pub struct BlockingDatabendClient(databend_driver::Client);

#[pymethods]
impl BlockingDatabendClient {
#[new]
#[pyo3(signature = (dsn))]
pub fn new(dsn: String) -> PyResult<Self> {
let client = databend_driver::Client::new(dsn);
Ok(Self(client))
}

pub fn get_conn(&self) -> PyResult<BlockingDatabendConnection> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let conn = this.get_conn().await.unwrap();
conn
});
Ok(BlockingDatabendConnection(ret))
}
}

#[pyclass(module = "databend_driver")]
pub struct BlockingDatabendConnection(Box<dyn databend_driver::Connection>);

#[pymethods]
impl BlockingDatabendConnection {
pub fn info(&self) -> PyResult<ConnectionInfo> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let info = this.info().await;
info
});
Ok(ConnectionInfo::new(ret))
}

pub fn version(&self) -> PyResult<String> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let version = this.version().await.unwrap();
version
});
Ok(ret)
}

pub fn exec(&self, sql: String) -> PyResult<i64> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let res = this.exec(&sql).await.unwrap();
res
});
Ok(ret)
}

pub fn query_row(&self, sql: String) -> PyResult<Option<Row>> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let row = this.query_row(&sql).await.unwrap();
row
});
Ok(ret.map(Row::new))
}

pub fn query_iter(&self, sql: String) -> PyResult<RowIterator> {
let this = self.0.clone();
// future_into_py(py, async move {
// let streamer = this.query_iter(&sql).await.unwrap();
// Ok(RowIterator::new(streamer))
// })
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let streamer = this.query_iter(&sql).await.unwrap();
streamer
});
Ok(RowIterator::new(ret))
}

pub fn stream_load(&self, sql: String, data: Vec<Vec<String>>) -> PyResult<ServerStats> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let data = data
.iter()
.map(|v| v.iter().map(|s| s.as_ref()).collect())
.collect();
let ss = this.stream_load(&sql, data).await.unwrap();
ss
});
Ok(ServerStats::new(ret))
}
}
Loading

0 comments on commit 7718530

Please sign in to comment.