Skip to content

Commit

Permalink
feat: support argument presign=auto/detect/on/off
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Jan 24, 2024
1 parent af1adf9 commit a31a637
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 25 deletions.
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ databend[+flight]://user:[password]@host[:port]/[database][?sslmode=disable][&ar

Examples:

- `databend://root:@localhost:8000/?sslmode=disable`
- `databend://root:@localhost:8000/?sslmode=disable&presign=detect`

- `databend://user1:password1@tnxxxx--default.gw.aws-us-east-2.default.databend.com:443/benchmark?enable_dphyp=1`

Expand All @@ -187,16 +187,13 @@ RestAPI client:

| Arg | Description |
|---|---|
| `presigned_url_disabled` | Set to `1` to disable presigned upload to object storage, *should only be used with local testing environment* |
| `presigned_url_disabled` | Set to `1` to disable presigned upload to object storage, *deprecated, use `presign` instead* |
| `wait_time_secs` | Request wait time for page, default to `1` |
| `max_rows_in_buffer` | Max rows for page buffer |
| `max_rows_per_page` | Max response rows for a single page |
| `page_request_timeout_secs` | Timeout for a single page request, default to `30` |
| `presign` | Whether to enable presign for data loading, available arguments are auto/detect/on/off, default to `auto` which only enable presign for `Databend Cloud` |

Example to disable presign using set
```
bendsql --set presigned_url_disabled=1
```

FlightSQL client:

Expand Down
2 changes: 1 addition & 1 deletion cli/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl Session {
Err(_) => {
eprintln!("-> WARN: Backend storage dose not support presigned url.");
eprintln!(" Loading data from local file may not work as expected.");
eprintln!(" Be aware of data transfer cost with argument `presigned_url_disabled=1`.");
eprintln!(" Be aware of data transfer cost with arg `presign=off`.");
}
Ok(resp) => {
let now_utc = chrono::Utc::now();
Expand Down
68 changes: 57 additions & 11 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio_util::io::ReaderStream;
use url::Url;

use crate::auth::{AccessTokenAuth, AccessTokenFileAuth, Auth, BasicAuth};
use crate::presign::{presign_upload_to_stage, PresignedResponse, Reader};
use crate::presign::{presign_upload_to_stage, PresignMode, PresignedResponse, Reader};
use crate::stage::StageLocation;
use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -68,7 +68,7 @@ pub struct APIClient {

tls_ca_file: Option<String>,

presigned_url_disabled: bool,
presign: PresignMode,
}

impl APIClient {
Expand Down Expand Up @@ -111,9 +111,10 @@ impl APIClient {
};
}
"presigned_url_disabled" => {
client.presigned_url_disabled = match v.as_ref() {
"true" | "1" => true,
"false" | "0" => false,
warn!("presigned_url_disabled is deprecated, please use presign=auto/detect/on/off in DSN instead");
client.presign = match v.as_ref() {
"true" | "1" => PresignMode::On,
"false" | "0" => PresignMode::Off,
_ => {
return Err(Error::BadArgument(format!(
"Invalid value for presigned_url_disabled: {}",
Expand All @@ -122,6 +123,20 @@ impl APIClient {
}
}
}
"presign" => {
client.presign = match v.as_ref() {
"auto" => PresignMode::Auto,
"detect" => PresignMode::Detect,
"on" => PresignMode::On,
"off" => PresignMode::Off,
_ => {
return Err(Error::BadArgument(format!(
"Invalid value for presign: {}, should be one of auto/detect/on/off",
v
)))
}
}
}
"tenant" => {
client.tenant = Some(v.to_string());
}
Expand Down Expand Up @@ -182,9 +197,33 @@ impl APIClient {
.with_role(role)
.with_database(database),
));

client.init_presign().await?;

Ok(client)
}

async fn init_presign(&mut self) -> Result<()> {
match self.presign {
PresignMode::Auto => {
if self.host.ends_with(".databend.com") || self.host.ends_with(".databend.cn") {
self.presign = PresignMode::On;
} else {
self.presign = PresignMode::Off;
}
}
PresignMode::Detect => match self.get_presigned_upload_url("~/.bendsql/check").await {
Ok(_) => self.presign = PresignMode::On,
Err(e) => {
warn!("presign mode off with error detected: {}", e);
self.presign = PresignMode::Off;
}
},
_ => {}
}
Ok(())
}

pub async fn current_warehouse(&self) -> Option<String> {
let guard = self.warehouse.lock().await;
guard.clone()
Expand Down Expand Up @@ -481,11 +520,18 @@ impl APIClient {
}

pub async fn upload_to_stage(&self, stage: &str, data: Reader, size: u64) -> Result<()> {
if self.presigned_url_disabled {
self.upload_to_stage_with_stream(stage, data, size).await
} else {
let presigned = self.get_presigned_upload_url(stage).await?;
presign_upload_to_stage(presigned, data, size).await
match self.presign {
PresignMode::Off => self.upload_to_stage_with_stream(stage, data, size).await,
PresignMode::On => {
let presigned = self.get_presigned_upload_url(stage).await?;
presign_upload_to_stage(presigned, data, size).await
}
PresignMode::Auto => {
unreachable!("PresignMode::Auto should be handled during client initialization")
}
PresignMode::Detect => {
unreachable!("PresignMode::Detect should be handled during client initialization")
}
}
}

Expand Down Expand Up @@ -536,7 +582,7 @@ impl Default for APIClient {
max_rows_per_page: None,
page_request_timeout: Duration::from_secs(30),
tls_ca_file: None,
presigned_url_disabled: false,
presign: PresignMode::Auto,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/presign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ use crate::error::{Error, Result};

pub type Reader = Box<dyn AsyncRead + Send + Sync + Unpin + 'static>;

#[derive(Debug, Clone)]
pub enum PresignMode {
Auto,
Detect,
On,
Off,
}

pub struct PresignedResponse {
pub method: String,
pub headers: BTreeMap<String, String>,
Expand Down
12 changes: 7 additions & 5 deletions core/tests/core/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use databend_client::APIClient;

use crate::common::DEFAULT_DSN;

async fn insert_with_stage(presigned: bool) {
async fn insert_with_stage(presign: bool) {
let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN);
let client = if presigned {
APIClient::from_dsn(dsn).await.unwrap()
let client = if presign {
APIClient::from_dsn(&format!("{}&presign=on", dsn))
.await
.unwrap()
} else {
APIClient::from_dsn(&format!("{}&presigned_url_disabled=1", dsn))
APIClient::from_dsn(&format!("{}&presign=off", dsn))
.await
.unwrap()
};
Expand All @@ -33,7 +35,7 @@ async fn insert_with_stage(presigned: bool) {

let path = chrono::Utc::now().format("%Y%m%d%H%M%S%9f").to_string();
let stage_location = format!("@~/{}/sample.csv", path);
let table = if presigned {
let table = if presign {
format!("sample_insert_presigned_{}", path)
} else {
format!("sample_insert_stream_{}", path)
Expand Down
4 changes: 2 additions & 2 deletions driver/tests/driver/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::common::DEFAULT_DSN;
async fn prepare_client(presigned: bool) -> Option<Client> {
let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN);
let client = if presigned {
Client::new(dsn.to_string())
Client::new(format!("{}&presign=on", dsn))
} else {
Client::new(format!("{}&presigned_url_disabled=1", dsn))
Client::new(format!("{}&presign=off", dsn))
};
let conn = client.get_conn().await.unwrap();
let info = conn.info().await;
Expand Down

0 comments on commit a31a637

Please sign in to comment.