Skip to content

Commit

Permalink
feat: add log for query & page
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Nov 15, 2023
1 parent 8888943 commit 8e34434
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
clap = { version = "4.3", features = ["derive", "env"] }
comfy-table = "7.0"
csv = "1.2"
fern = "0.6"
indicatif = "0.17"
log = "0.4"
logos = "0.13"
once_cell = "1.18"
rustyline = "12.0"
Expand All @@ -37,6 +39,7 @@ tokio = { version = "1.28", features = [
] }
tokio-stream = "0.1"
toml = "0.8"
tracing-appender = "0.2"
unicode-segmentation = "1.10"
url = { version = "2.4", default-features = false }

Expand Down
13 changes: 13 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod config;
mod display;
mod helper;
mod session;
mod trace;

use std::{
collections::BTreeMap,
Expand All @@ -29,6 +30,7 @@ use crate::config::OutputQuoteStyle;
use anyhow::{anyhow, Result};
use clap::{ArgAction, CommandFactory, Parser, ValueEnum};
use config::{Config, OutputFormat, Settings, TimeOption};
use log::info;
use once_cell::sync::Lazy;

static VERSION: Lazy<String> = Lazy::new(|| {
Expand Down Expand Up @@ -172,6 +174,9 @@ struct Args {
help = "Only show execution time without results, will implicitly set output format to `null`."
)]
time: Option<TimeOption>,

#[clap(short = 'l', default_value = "info", long)]
log_level: String,
}

/// Parse a single key-value pair
Expand Down Expand Up @@ -334,6 +339,14 @@ pub async fn main() -> Result<()> {

let mut session = session::Session::try_new(dsn, settings, is_repl).await?;

let log_dir = format!(
"{}/.bendsql",
std::env::var("HOME").unwrap_or_else(|_| ".".to_string())
);

let _guards = trace::init_logging(&log_dir, &args.log_level).await?;
info!("bendsql version: {:?}", VERSION);

if is_repl {
session.handle_repl().await;
return Ok(());
Expand Down
58 changes: 58 additions & 0 deletions cli/src/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use log::LevelFilter;
use std::io::BufWriter;
use std::io::Write;
use std::str::FromStr;

use anyhow::Result;
use tracing_appender::rolling::RollingFileAppender;
use tracing_appender::rolling::Rotation;

#[allow(dyn_drop)]
pub async fn init_logging(
dir: &str,
level: &str,
) -> Result<Vec<Box<dyn Drop + Send + Sync + 'static>>> {
let mut guards: Vec<Box<dyn Drop + Send + Sync + 'static>> = Vec::new();
let mut logger = fern::Dispatch::new();

let rolling = RollingFileAppender::new(Rotation::DAILY, dir, "bendsql");
let (non_blocking, flush_guard) = tracing_appender::non_blocking(rolling);
let buffered_non_blocking = BufWriter::with_capacity(64 * 1024 * 1024, non_blocking);

guards.push(Box::new(flush_guard));
logger = logger.chain(
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{}] - {} - [{}] {}",
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
record.level(),
record.target(),
message
))
})
.level(LevelFilter::from_str(level)?)
.chain(Box::new(buffered_non_blocking) as Box<dyn Write + Send>),
);

if logger.apply().is_err() {
eprintln!("logger has already been set");
return Ok(Vec::new());
}

Ok(guards)
}
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ native-tls = ["reqwest/native-tls"]

[dependencies]
http = "0.2"
log = "0.4"
once_cell = "1.18"
percent-encoding = "2.3"
reqwest = { version = "0.11", default-features = false, features = ["json", "multipart", "stream"] }
Expand Down
12 changes: 12 additions & 0 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use http::StatusCode;
use log::info;
use once_cell::sync::Lazy;
use percent_encoding::percent_decode_str;
use reqwest::header::HeaderMap;
Expand Down Expand Up @@ -206,6 +207,7 @@ impl APIClient {
}

pub async fn start_query(&self, sql: &str) -> Result<QueryResponse> {
info!("start query: {}", sql);
let session_settings = self.make_session().await;
let req = QueryRequest::new(sql)
.with_pagination(self.make_pagination())
Expand Down Expand Up @@ -253,6 +255,7 @@ impl APIClient {
}

pub async fn query_page(&self, query_id: &str, next_uri: &str) -> Result<QueryResponse> {
info!("query page: {}", next_uri);
let endpoint = self.endpoint.join(next_uri)?;
let headers = self.make_headers(query_id).await?;
let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3);
Expand Down Expand Up @@ -286,6 +289,7 @@ impl APIClient {
}

pub async fn kill_query(&self, query_id: &str, kill_uri: &str) -> Result<()> {
info!("kill query: {}", kill_uri);
let endpoint = self.endpoint.join(kill_uri)?;
let headers = self.make_headers(query_id).await?;
let resp = self
Expand All @@ -307,6 +311,7 @@ impl APIClient {
}

pub async fn wait_for_query(&self, resp: QueryResponse) -> Result<QueryResponse> {
info!("wait for query: {}", resp.id);
if let Some(next_uri) = &resp.next_uri {
let schema = resp.schema;
let mut data = resp.data;
Expand All @@ -324,6 +329,7 @@ impl APIClient {
}

pub async fn query(&self, sql: &str) -> Result<QueryResponse> {
info!("query: {}", sql);
let resp = self.start_query(sql).await?;
self.wait_for_query(resp).await
}
Expand Down Expand Up @@ -388,6 +394,10 @@ impl APIClient {
file_format_options: BTreeMap<&str, &str>,
copy_options: BTreeMap<&str, &str>,
) -> Result<QueryResponse> {
info!(
"insert with stage: {}, format: {:?}, copy: {:?}",
sql, file_format_options, copy_options
);
let session_settings = self.make_session().await;
let stage_attachment = Some(StageAttachmentConfig {
location: stage,
Expand Down Expand Up @@ -440,6 +450,7 @@ impl APIClient {
}

async fn get_presigned_upload_url(&self, stage: &str) -> Result<PresignedResponse> {
info!("get presigned upload url: {}", stage);
let sql = format!("PRESIGN UPLOAD {}", stage);
let resp = self.query(&sql).await?;
if resp.data.len() != 1 {
Expand Down Expand Up @@ -486,6 +497,7 @@ impl APIClient {
data: Reader,
size: u64,
) -> Result<()> {
info!("upload to stage with stream: {}, size: {}", stage, size);
let endpoint = self.endpoint.join("v1/upload_to_stage")?;
let location = StageLocation::try_from(stage)?;
let query_id = self.gen_query_id();
Expand Down
1 change: 1 addition & 0 deletions driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
csv = "1.3"
dyn-clone = "1.0"
glob = "0.3"
log = "0.4"
percent-encoding = "2.3"
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.28", features = ["macros"] }
Expand Down
15 changes: 15 additions & 0 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use log::info;
use tokio::fs::File;
use tokio_stream::Stream;

Expand Down Expand Up @@ -51,6 +52,7 @@ impl Connection for RestAPIConnection {
}

async fn exec(&self, sql: &str) -> Result<i64> {
info!("exec: {}", sql);
let mut resp = self.client.start_query(sql).await?;
while let Some(next_uri) = resp.next_uri {
resp = self.client.query_page(&resp.id, &next_uri).await?;
Expand All @@ -59,18 +61,21 @@ impl Connection for RestAPIConnection {
}

async fn query_iter(&self, sql: &str) -> Result<RowIterator> {
info!("query iter: {}", sql);
let rows_with_progress = self.query_iter_ext(sql).await?;
let rows = rows_with_progress.filter_rows().await;
Ok(rows)
}

async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator> {
info!("query iter ext: {}", sql);
let resp = self.client.start_query(sql).await?;
let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?;
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
}

async fn query_row(&self, sql: &str) -> Result<Option<Row>> {
info!("query row: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_data(resp).await?;
match resp.kill_uri {
Expand All @@ -91,6 +96,7 @@ impl Connection for RestAPIConnection {
}

async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result<PresignedResponse> {
info!("get presigned url: {} {}", operation, stage);
let sql = format!("PRESIGN {} {}", operation, stage);
let row = self.query_row(&sql).await?.ok_or(Error::InvalidResponse(
"Empty response from server for presigned request".to_string(),
Expand Down Expand Up @@ -118,6 +124,10 @@ impl Connection for RestAPIConnection {
file_format_options: Option<BTreeMap<&str, &str>>,
copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats> {
info!(
"load data: {}, size: {}, format: {:?}, copy: {:?}",
sql, size, file_format_options, copy_options
);
let now = chrono::Utc::now()
.timestamp_nanos_opt()
.ok_or_else(|| Error::IO("Failed to get current timestamp".to_string()))?;
Expand All @@ -140,6 +150,10 @@ impl Connection for RestAPIConnection {
mut format_options: BTreeMap<&str, &str>,
copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats> {
info!(
"load file: {}, file: {:?}, format: {:?}, copy: {:?}",
sql, fp, format_options, copy_options
);
let file = File::open(fp).await?;
let metadata = file.metadata().await?;
let data = Box::new(file);
Expand All @@ -157,6 +171,7 @@ impl Connection for RestAPIConnection {
}

async fn stream_load(&self, sql: &str, data: Vec<Vec<&str>>) -> Result<ServerStats> {
info!("stream load: {}, length: {:?}", sql, data.len());
let mut wtr = csv::WriterBuilder::new().from_writer(vec![]);
for row in data {
wtr.write_record(row)
Expand Down

0 comments on commit 8e34434

Please sign in to comment.