diff --git a/Cargo.lock b/Cargo.lock index 857e7e87ed..4e3f995826 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,9 +181,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" [[package]] name = "anymap2" @@ -2765,7 +2765,7 @@ dependencies = [ [[package]] name = "fluvio-extension-common" -version = "0.14.2" +version = "0.14.3" dependencies = [ "anyhow", "async-trait", @@ -3086,7 +3086,7 @@ dependencies = [ [[package]] name = "fluvio-socket" -version = "0.14.10" +version = "0.14.11" dependencies = [ "async-channel 1.9.0", "async-lock 3.4.0", diff --git a/crates/fluvio-extension-common/Cargo.toml b/crates/fluvio-extension-common/Cargo.toml index 9a95568d93..35033925ef 100644 --- a/crates/fluvio-extension-common/Cargo.toml +++ b/crates/fluvio-extension-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-extension-common" -version = "0.14.2" +version = "0.14.3" edition = "2021" authors = ["Fluvio Contributors "] description = "Fluvio extension common" diff --git a/crates/fluvio-extension-common/src/lib.rs b/crates/fluvio-extension-common/src/lib.rs index 8db55e8a24..765e2e63fa 100644 --- a/crates/fluvio-extension-common/src/lib.rs +++ b/crates/fluvio-extension-common/src/lib.rs @@ -154,15 +154,9 @@ pub mod target { .into()); } - let config_file = ConfigFile::load(None)?; - let cluster = config_file - .config() - // NOTE: This will not fallback to current cluster like it did before - // Current cluster will be used when no profile is given. - .cluster_with_profile(&profile) - .ok_or_else(|| { - IoError::new(ErrorKind::Other, "Cluster not found for profile") - })?; + let cluster = FluvioConfig::load_with_profile(&profile)?.ok_or_else(|| { + IoError::new(ErrorKind::Other, "Cluster not found for profile") + })?; Ok(cluster.clone()) } (None, Some(cluster)) => { diff --git a/crates/fluvio-socket/Cargo.toml b/crates/fluvio-socket/Cargo.toml index d2a873c324..61f65a5ae5 100644 --- a/crates/fluvio-socket/Cargo.toml +++ b/crates/fluvio-socket/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-socket" -version = "0.14.10" +version = "0.14.11" edition = "2021" authors = ["Fluvio Contributors "] description = "Provide TCP socket wrapper for fluvio protocol" diff --git a/crates/fluvio-socket/src/multiplexing.rs b/crates/fluvio-socket/src/multiplexing.rs index a34237de21..cadc14ec45 100644 --- a/crates/fluvio-socket/src/multiplexing.rs +++ b/crates/fluvio-socket/src/multiplexing.rs @@ -17,21 +17,21 @@ use async_channel::bounded; use async_channel::Receiver; use async_channel::Sender; use async_lock::Mutex; -use bytes::{Bytes}; +use bytes::Bytes; use event_listener::Event; -use fluvio_future::net::ConnectionFd; +use futures_util::ready; use futures_util::stream::{Stream, StreamExt}; use pin_project::{pin_project, pinned_drop}; use tokio::select; use tracing::{info, warn}; use tracing::{debug, error, trace, instrument}; +use fluvio_future::net::ConnectionFd; use fluvio_future::timer::sleep; -use futures_util::ready; use fluvio_protocol::api::Request; use fluvio_protocol::api::RequestHeader; use fluvio_protocol::api::RequestMessage; -use fluvio_protocol::{Decoder}; +use fluvio_protocol::Decoder; use crate::SocketError; use crate::ExclusiveFlvSink; diff --git a/crates/fluvio/src/config/cluster.rs b/crates/fluvio/src/config/cluster.rs index cf3ec8a726..3f98ad54d3 100644 --- a/crates/fluvio/src/config/cluster.rs +++ b/crates/fluvio/src/config/cluster.rs @@ -48,6 +48,14 @@ impl FluvioConfig { Ok(cluster_config.to_owned()) } + /// get cluster config from profile + /// if profile is not found, return None + pub fn load_with_profile(profile_name: &str) -> Result, FluvioError> { + let config_file = ConfigFile::load_default_or_new()?; + let cluster_config = config_file.config().cluster_with_profile(profile_name); + Ok(cluster_config.cloned()) + } + /// Create a new cluster configuration with no TLS. pub fn new(addr: impl Into) -> Self { Self { diff --git a/crates/fluvio/src/config/config.rs b/crates/fluvio/src/config/config.rs index 43e3aa6bc4..d89eb78bd0 100644 --- a/crates/fluvio/src/config/config.rs +++ b/crates/fluvio/src/config/config.rs @@ -503,6 +503,12 @@ pub mod test { let cluster = config.current_cluster().expect("cluster should exist"); assert_eq!(cluster.endpoint, "127.0.0.1:9003"); + // access from profile + config + .cluster_with_profile("local") + .expect("cluster should exists"); + // access from cluster + config.cluster("local").expect("cluster should exists"); } #[test] diff --git a/crates/fluvio/src/fluvio.rs b/crates/fluvio/src/fluvio.rs index 20fcc8f3d8..5e63f3689e 100644 --- a/crates/fluvio/src/fluvio.rs +++ b/crates/fluvio/src/fluvio.rs @@ -1,39 +1,31 @@ use std::convert::TryFrom; use std::sync::Arc; -use fluvio_sc_schema::partition::PartitionMirrorConfig; -use fluvio_sc_schema::topic::MirrorConfig; -use fluvio_sc_schema::topic::PartitionMap; -use fluvio_sc_schema::topic::ReplicaSpec; -use tracing::{debug, info}; +use anyhow::{Context, Result}; +use semver::Version; use tokio::sync::OnceCell; -use anyhow::Result; +use tracing::{debug, info}; +use fluvio_future::net::DomainConnector; +use fluvio_sc_schema::partition::PartitionMirrorConfig; +use fluvio_sc_schema::topic::{MirrorConfig, PartitionMap, ReplicaSpec}; use fluvio_sc_schema::objects::ObjectApiWatchRequest; use fluvio_types::PartitionId; use fluvio_socket::{ ClientConfig, Versions, VersionedSerialSocket, SharedMultiplexerSocket, MultiplexerSocket, }; -use fluvio_future::net::DomainConnector; -use semver::Version; use crate::admin::FluvioAdmin; use crate::error::anyhow_version_error; -use crate::producer::TopicProducerPool; -use crate::spu::SpuPool; -use crate::TopicProducer; -use crate::PartitionConsumer; - -use crate::FluvioError; -use crate::FluvioConfig; -use crate::consumer::{MultiplePartitionConsumer, PartitionSelectionStrategy}; use crate::consumer::{ - ConsumerStream, MultiplePartitionConsumerStream, Record, ConsumerConfigExt, ConsumerOffset, + MultiplePartitionConsumer, PartitionSelectionStrategy, ConsumerStream, + MultiplePartitionConsumerStream, Record, ConsumerConfigExt, ConsumerOffset, }; use crate::metrics::ClientMetrics; -use crate::producer::TopicProducerConfig; -use crate::spu::SpuSocketPool; +use crate::producer::{TopicProducerPool, TopicProducerConfig}; use crate::sync::MetadataStores; +use crate::spu::{SpuPool, SpuSocketPool}; +use crate::{TopicProducer, PartitionConsumer, FluvioError, FluvioConfig}; /// An interface for interacting with Fluvio streaming pub struct Fluvio { @@ -90,6 +82,26 @@ impl Fluvio { Self::connect_with_connector(connector, config).await } + /// Creates a new Fluvio client with the given profile + /// + /// # Example + /// + /// ```no_run + /// # use fluvio::{Fluvio, FluvioError, FluvioConfig}; + /// use fluvio::config::ConfigFile; + /// # async fn do_connect_with_profile_name() -> anyhow::Result<()> { + /// let fluvio = Fluvio::connect_with_profile("local").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn connect_with_profile(profile: &str) -> Result { + let config = FluvioConfig::load_with_profile(profile)?.context(format!( + "Failed to load cluster config with profile `{profile}`" + ))?; + Self::connect_with_config(&config).await + } + + /// Creates a new Fluvio client with the given connector and configuration pub async fn connect_with_connector( connector: DomainConnector, config: &FluvioConfig,