From 8e3443457fc7bfbb64805cd9d9b31b9b78e7bed0 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 15 Nov 2023 15:00:47 +0800 Subject: [PATCH] feat: add log for query & page --- cli/Cargo.toml | 3 +++ cli/src/main.rs | 13 ++++++++++ cli/src/trace.rs | 58 ++++++++++++++++++++++++++++++++++++++++++ core/Cargo.toml | 1 + core/src/client.rs | 12 +++++++++ driver/Cargo.toml | 1 + driver/src/rest_api.rs | 15 +++++++++++ 7 files changed, 103 insertions(+) create mode 100644 cli/src/trace.rs diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 28f40c017..06fbe243c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -19,7 +19,9 @@ chrono = { version = "0.4.31", default-features = false, features = ["clock"] } clap = { version = "4.3", features = ["derive", "env"] } comfy-table = "7.0" csv = "1.2" +fern = "0.6" indicatif = "0.17" +log = "0.4" logos = "0.13" once_cell = "1.18" rustyline = "12.0" @@ -37,6 +39,7 @@ tokio = { version = "1.28", features = [ ] } tokio-stream = "0.1" toml = "0.8" +tracing-appender = "0.2" unicode-segmentation = "1.10" url = { version = "2.4", default-features = false } diff --git a/cli/src/main.rs b/cli/src/main.rs index 2dbd45245..ebe0c9c3b 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -19,6 +19,7 @@ mod config; mod display; mod helper; mod session; +mod trace; use std::{ collections::BTreeMap, @@ -29,6 +30,7 @@ use crate::config::OutputQuoteStyle; use anyhow::{anyhow, Result}; use clap::{ArgAction, CommandFactory, Parser, ValueEnum}; use config::{Config, OutputFormat, Settings, TimeOption}; +use log::info; use once_cell::sync::Lazy; static VERSION: Lazy = Lazy::new(|| { @@ -172,6 +174,9 @@ struct Args { help = "Only show execution time without results, will implicitly set output format to `null`." )] time: Option, + + #[clap(short = 'l', default_value = "info", long)] + log_level: String, } /// Parse a single key-value pair @@ -334,6 +339,14 @@ pub async fn main() -> Result<()> { let mut session = session::Session::try_new(dsn, settings, is_repl).await?; + let log_dir = format!( + "{}/.bendsql", + std::env::var("HOME").unwrap_or_else(|_| ".".to_string()) + ); + + let _guards = trace::init_logging(&log_dir, &args.log_level).await?; + info!("bendsql version: {:?}", VERSION); + if is_repl { session.handle_repl().await; return Ok(()); diff --git a/cli/src/trace.rs b/cli/src/trace.rs new file mode 100644 index 000000000..59ac624a4 --- /dev/null +++ b/cli/src/trace.rs @@ -0,0 +1,58 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use log::LevelFilter; +use std::io::BufWriter; +use std::io::Write; +use std::str::FromStr; + +use anyhow::Result; +use tracing_appender::rolling::RollingFileAppender; +use tracing_appender::rolling::Rotation; + +#[allow(dyn_drop)] +pub async fn init_logging( + dir: &str, + level: &str, +) -> Result>> { + let mut guards: Vec> = Vec::new(); + let mut logger = fern::Dispatch::new(); + + let rolling = RollingFileAppender::new(Rotation::DAILY, dir, "bendsql"); + let (non_blocking, flush_guard) = tracing_appender::non_blocking(rolling); + let buffered_non_blocking = BufWriter::with_capacity(64 * 1024 * 1024, non_blocking); + + guards.push(Box::new(flush_guard)); + logger = logger.chain( + fern::Dispatch::new() + .format(|out, message, record| { + out.finish(format_args!( + "[{}] - {} - [{}] {}", + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true), + record.level(), + record.target(), + message + )) + }) + .level(LevelFilter::from_str(level)?) + .chain(Box::new(buffered_non_blocking) as Box), + ); + + if logger.apply().is_err() { + eprintln!("logger has already been set"); + return Ok(Vec::new()); + } + + Ok(guards) +} diff --git a/core/Cargo.toml b/core/Cargo.toml index b8739f37b..9fda63855 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,6 +20,7 @@ native-tls = ["reqwest/native-tls"] [dependencies] http = "0.2" +log = "0.4" once_cell = "1.18" percent-encoding = "2.3" reqwest = { version = "0.11", default-features = false, features = ["json", "multipart", "stream"] } diff --git a/core/src/client.rs b/core/src/client.rs index c78a73d48..8a0844d88 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use http::StatusCode; +use log::info; use once_cell::sync::Lazy; use percent_encoding::percent_decode_str; use reqwest::header::HeaderMap; @@ -206,6 +207,7 @@ impl APIClient { } pub async fn start_query(&self, sql: &str) -> Result { + info!("start query: {}", sql); let session_settings = self.make_session().await; let req = QueryRequest::new(sql) .with_pagination(self.make_pagination()) @@ -253,6 +255,7 @@ impl APIClient { } pub async fn query_page(&self, query_id: &str, next_uri: &str) -> Result { + info!("query page: {}", next_uri); let endpoint = self.endpoint.join(next_uri)?; let headers = self.make_headers(query_id).await?; let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3); @@ -286,6 +289,7 @@ impl APIClient { } pub async fn kill_query(&self, query_id: &str, kill_uri: &str) -> Result<()> { + info!("kill query: {}", kill_uri); let endpoint = self.endpoint.join(kill_uri)?; let headers = self.make_headers(query_id).await?; let resp = self @@ -307,6 +311,7 @@ impl APIClient { } pub async fn wait_for_query(&self, resp: QueryResponse) -> Result { + info!("wait for query: {}", resp.id); if let Some(next_uri) = &resp.next_uri { let schema = resp.schema; let mut data = resp.data; @@ -324,6 +329,7 @@ impl APIClient { } pub async fn query(&self, sql: &str) -> Result { + info!("query: {}", sql); let resp = self.start_query(sql).await?; self.wait_for_query(resp).await } @@ -388,6 +394,10 @@ impl APIClient { file_format_options: BTreeMap<&str, &str>, copy_options: BTreeMap<&str, &str>, ) -> Result { + info!( + "insert with stage: {}, format: {:?}, copy: {:?}", + sql, file_format_options, copy_options + ); let session_settings = self.make_session().await; let stage_attachment = Some(StageAttachmentConfig { location: stage, @@ -440,6 +450,7 @@ impl APIClient { } async fn get_presigned_upload_url(&self, stage: &str) -> Result { + info!("get presigned upload url: {}", stage); let sql = format!("PRESIGN UPLOAD {}", stage); let resp = self.query(&sql).await?; if resp.data.len() != 1 { @@ -486,6 +497,7 @@ impl APIClient { data: Reader, size: u64, ) -> Result<()> { + info!("upload to stage with stream: {}, size: {}", stage, size); let endpoint = self.endpoint.join("v1/upload_to_stage")?; let location = StageLocation::try_from(stage)?; let query_id = self.gen_query_id(); diff --git a/driver/Cargo.toml b/driver/Cargo.toml index 43facdcb9..3acd4e520 100644 --- a/driver/Cargo.toml +++ b/driver/Cargo.toml @@ -35,6 +35,7 @@ chrono = { version = "0.4.31", default-features = false, features = ["clock"] } csv = "1.3" dyn-clone = "1.0" glob = "0.3" +log = "0.4" percent-encoding = "2.3" serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.28", features = ["macros"] } diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 97ae0fd63..81f085d60 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; +use log::info; use tokio::fs::File; use tokio_stream::Stream; @@ -51,6 +52,7 @@ impl Connection for RestAPIConnection { } async fn exec(&self, sql: &str) -> Result { + info!("exec: {}", sql); let mut resp = self.client.start_query(sql).await?; while let Some(next_uri) = resp.next_uri { resp = self.client.query_page(&resp.id, &next_uri).await?; @@ -59,18 +61,21 @@ impl Connection for RestAPIConnection { } async fn query_iter(&self, sql: &str) -> Result { + info!("query iter: {}", sql); let rows_with_progress = self.query_iter_ext(sql).await?; let rows = rows_with_progress.filter_rows().await; Ok(rows) } async fn query_iter_ext(&self, sql: &str) -> Result { + info!("query iter ext: {}", sql); let resp = self.client.start_query(sql).await?; let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?; Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows))) } async fn query_row(&self, sql: &str) -> Result> { + info!("query row: {}", sql); let resp = self.client.start_query(sql).await?; let resp = self.wait_for_data(resp).await?; match resp.kill_uri { @@ -91,6 +96,7 @@ impl Connection for RestAPIConnection { } async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result { + info!("get presigned url: {} {}", operation, stage); let sql = format!("PRESIGN {} {}", operation, stage); let row = self.query_row(&sql).await?.ok_or(Error::InvalidResponse( "Empty response from server for presigned request".to_string(), @@ -118,6 +124,10 @@ impl Connection for RestAPIConnection { file_format_options: Option>, copy_options: Option>, ) -> Result { + info!( + "load data: {}, size: {}, format: {:?}, copy: {:?}", + sql, size, file_format_options, copy_options + ); let now = chrono::Utc::now() .timestamp_nanos_opt() .ok_or_else(|| Error::IO("Failed to get current timestamp".to_string()))?; @@ -140,6 +150,10 @@ impl Connection for RestAPIConnection { mut format_options: BTreeMap<&str, &str>, copy_options: Option>, ) -> Result { + info!( + "load file: {}, file: {:?}, format: {:?}, copy: {:?}", + sql, fp, format_options, copy_options + ); let file = File::open(fp).await?; let metadata = file.metadata().await?; let data = Box::new(file); @@ -157,6 +171,7 @@ impl Connection for RestAPIConnection { } async fn stream_load(&self, sql: &str, data: Vec>) -> Result { + info!("stream load: {}, length: {:?}", sql, data.len()); let mut wtr = csv::WriterBuilder::new().from_writer(vec![]); for row in data { wtr.write_record(row)