diff --git a/Cargo.toml b/Cargo.toml index 36ef29b..19c6694 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,10 +13,10 @@ version = "0.0.4" shlex = "1.1.0" isatty = "0.1.9" rustyline = "11.0.0" -arrow-flight = { version = "35.0.0", features = ["flight-sql-experimental"] } -arrow = "35.0.0" +arrow-cast = { version = "51", features = ["prettyprint"] } +arrow-flight = { version = "51", features = ["flight-sql-experimental"] } +arrow = "51" futures = { version = "0.3", default-features = false, features = ["alloc"] } -arrow-cast = { version = "35.0.0", features = ["prettyprint"] } tokio = { version = "1.26", features = [ "macros", "rt", @@ -25,7 +25,7 @@ tokio = { version = "1.26", features = [ "parking_lot", ] } -tonic = { version = "0.8", default-features = false, features = [ +tonic = { version = "0.11", default-features = false, features = [ "transport", "codegen", "tls", diff --git a/src/main.rs b/src/main.rs index 6cb25e5..cf87171 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,7 +69,7 @@ fn print_usage() { fn endpoint(args: &Args, addr: String) -> Result { let mut endpoint = Endpoint::new(addr) - .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))? + .map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))? .connect_timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20)) .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait @@ -82,7 +82,7 @@ fn endpoint(args: &Args, addr: String) -> Result { let tls_config = ClientTlsConfig::new(); endpoint = endpoint .tls_config(tls_config) - .map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?; + .map_err(|_| ArrowError::IpcError("Cannot create TLS endpoint".to_string()))?; } Ok(endpoint) diff --git a/src/session.rs b/src/session.rs index 9259b79..b024a09 100644 --- a/src/session.rs +++ b/src/session.rs @@ -3,20 +3,18 @@ use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use arrow_cast::pretty::pretty_format_batches; use arrow_flight::sql::client::FlightSqlServiceClient; -use arrow_flight::utils::flight_data_to_batches; -use arrow_flight::FlightData; use futures::TryStreamExt; use rustyline::error::ReadlineError; use rustyline::history::DefaultHistory; use rustyline::Editor; use std::io::BufRead; use tokio::time::Instant; -use tonic::transport::Endpoint; +use tonic::transport::{Channel, Endpoint}; use crate::helper::CliHelper; pub struct Session { - client: FlightSqlServiceClient, + client: FlightSqlServiceClient, is_repl: bool, prompt: String, } @@ -31,7 +29,7 @@ impl Session { let channel = endpoint .connect() .await - .map_err(|err| ArrowError::IoError(err.to_string()))?; + .map_err(|err| ArrowError::IpcError(err.to_string()))?; if is_repl { println!("Welcome to Arrow CLI."); @@ -124,17 +122,15 @@ impl Session { } let start = Instant::now(); - let mut stmt = self.client.prepare(query.to_string()).await?; + let mut stmt = self.client.prepare(query.to_string(), None).await?; let flight_info = stmt.execute().await?; let ticket = flight_info.endpoint[0] .ticket .as_ref() - .ok_or_else(|| ArrowError::IoError("Ticket is emtpy".to_string()))?; + .ok_or_else(|| ArrowError::IpcError("Ticket is emtpy".to_string()))?; let flight_data = self.client.do_get(ticket.clone()).await?; - let flight_data: Vec = flight_data.try_collect().await.unwrap(); - - let batches = flight_data_to_batches(&flight_data)?; + let batches: Vec = flight_data.try_collect().await.unwrap(); if is_repl { let res = pretty_format_batches(batches.as_slice())?; @@ -161,7 +157,7 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result