Skip to content

Commit

Permalink
feat: support use warehouse with cloud (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Jul 10, 2023
1 parent 4f95f8e commit ea59261
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion bindings/nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib"]
doc = false

[dependencies]
databend-driver = { path = "../../driver", version = "0.2.24", features = ["rustls", "flight-sql"] }
databend-driver = { path = "../../driver", version = "0.3.0", features = ["rustls", "flight-sql"] }
futures = "0.3.28"
napi = { version = "2.13.2", default-features = false, features = [
"napi6",
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ doc = false

[dependencies]
chrono = { version = "0.4.24", default-features = false, features = ["std"] }
databend-client = { version = "0.2.0", path = "../../core" }
databend-driver = { path = "../../driver", version = "0.3.0", features = ["rustls", "flight-sql"] }
futures = "0.3.28"
databend-driver = { path = "../../driver", version = "0.2.20", features = ["rustls", "flight-sql"] }
databend-client = { version = "0.1.15", path = "../../core" }
pyo3 = { version = "0.18", features = ["abi3-py37"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
tokio = "1"
4 changes: 2 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bendsql"
version = "0.3.12"
version = "0.4.0"
edition = "2021"
license = "Apache-2.0"
description = "Databend Native Command Line Tool"
Expand All @@ -15,7 +15,7 @@ chrono = { version = "0.4.24", default-features = false, features = ["clock"] }
clap = { version = "4.1.0", features = ["derive", "env"] }
comfy-table = "6.1.4"
csv = "1.2.1"
databend-driver = { path = "../driver", version = "0.2.23", features = ["rustls", "flight-sql"] }
databend-driver = { path = "../driver", version = "0.3.0", features = ["rustls", "flight-sql"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
humantime-serde = "1.1.1"
indicatif = "0.17.3"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "databend-client"
version = "0.1.16"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
description = "Databend Client for Rust"
Expand Down
37 changes: 26 additions & 11 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct APIClient {
pub port: u16,

tenant: Option<String>,
warehouse: Option<String>,
warehouse: Arc<Mutex<Option<String>>>,
pub database: Arc<Mutex<Option<String>>>,
pub user: String,
password: Option<String>,
Expand Down Expand Up @@ -135,7 +135,7 @@ impl APIClient {
client.tenant = Some(v.to_string());
}
"warehouse" => {
client.warehouse = Some(v.to_string());
client.warehouse = Arc::new(Mutex::new(Some(v.to_string())));
}
"sslmode" => {
if v == "disable" {
Expand Down Expand Up @@ -167,12 +167,13 @@ impl APIClient {
.with_pagination(self.make_pagination())
.with_session(session_settings);
let endpoint = self.endpoint.join("v1/query")?;
let headers = self.make_headers().await?;
let resp = self
.cli
.post(endpoint)
.json(&req)
.basic_auth(self.user.clone(), self.password.clone())
.headers(self.make_headers()?)
.headers(headers)
.send()
.await?;
if resp.status() != StatusCode::OK {
Expand All @@ -194,7 +195,15 @@ impl APIClient {
}
if let Some(settings) = &session.settings {
for (k, v) in settings {
session_settings.insert(k.clone(), v.clone());
match k.as_str() {
"warehouse" => {
let mut warehouse = self.warehouse.lock().await;
*warehouse = Some(v.clone());
}
_ => {
session_settings.insert(k.clone(), v.clone());
}
}
}
}
}
Expand All @@ -203,11 +212,12 @@ impl APIClient {

pub async fn query_page(&self, next_uri: &str) -> Result<QueryResponse> {
let endpoint = self.endpoint.join(next_uri)?;
let headers = self.make_headers().await?;
let resp = self
.cli
.get(endpoint)
.basic_auth(self.user.clone(), self.password.clone())
.headers(self.make_headers()?)
.headers(headers)
.send()
.await?;
if resp.status() != StatusCode::OK {
Expand Down Expand Up @@ -289,12 +299,13 @@ impl APIClient {
Some(pagination)
}

fn make_headers(&self) -> Result<HeaderMap> {
async fn make_headers(&self) -> Result<HeaderMap> {
let mut headers = HeaderMap::new();
if let Some(tenant) = &self.tenant {
headers.insert("X-DATABEND-TENANT", tenant.parse()?);
}
if let Some(warehouse) = &self.warehouse {
let warehouse = self.warehouse.lock().await;
if let Some(warehouse) = &*warehouse {
headers.insert("X-DATABEND-WAREHOUSE", warehouse.parse()?);
}
Ok(headers)
Expand All @@ -318,12 +329,13 @@ impl APIClient {
.with_session(session_settings)
.with_stage_attachment(stage_attachment);
let endpoint = self.endpoint.join("v1/query")?;
let headers = self.make_headers().await?;
let resp = self
.cli
.post(endpoint)
.json(&req)
.basic_auth(self.user.clone(), self.password.clone())
.headers(self.make_headers()?)
.headers(headers)
.send()
.await?;
if resp.status() != StatusCode::OK {
Expand Down Expand Up @@ -361,7 +373,7 @@ impl APIClient {
) -> Result<()> {
let endpoint = self.endpoint.join("v1/upload_to_stage")?;
let location = StageLocation::try_from(stage_location)?;
let mut headers = self.make_headers()?;
let mut headers = self.make_headers().await?;
headers.insert("stage_name", location.name.parse()?);
let stream = Body::wrap_stream(ReaderStream::new(data));
let part = Part::stream_with_length(stream, size).file_name(location.path);
Expand Down Expand Up @@ -451,7 +463,7 @@ impl Default for APIClient {
host: "localhost".to_string(),
port: 8000,
tenant: None,
warehouse: None,
warehouse: Arc::new(Mutex::new(None)),
database: Arc::new(Mutex::new(None)),
user: "root".to_string(),
password: None,
Expand Down Expand Up @@ -484,7 +496,10 @@ mod test {
assert_eq!(client.max_rows_in_buffer, Some(5000000));
assert_eq!(client.max_rows_per_page, Some(10000));
assert_eq!(client.tenant, None);
assert_eq!(client.warehouse, Some("wh".to_string()));
assert_eq!(
*client.warehouse.try_lock().unwrap(),
Some("wh".to_string())
);
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions driver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "databend-driver"
version = "0.2.24"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
description = "Databend Driver for Rust"
Expand All @@ -21,7 +21,7 @@ flight-sql = ["dep:arrow-array", "dep:arrow-cast", "dep:arrow-flight", "dep:arro
[dependencies]
async-trait = "0.1.68"
chrono = { version = "0.4.24", default-features = false, features = ["clock"] }
databend-client = { version = "0.1.16", path = "../core" }
databend-client = { version = "0.2.0", path = "../core" }
dyn-clone = "1.0.11"
http = "0.2.9"
percent-encoding = "2.2.0"
Expand Down

0 comments on commit ea59261

Please sign in to comment.