From cac61fdc8400d44856410621668d799d70a22224 Mon Sep 17 00:00:00 2001 From: yoshidan Date: Wed, 31 Jul 2024 19:36:08 +0900 Subject: [PATCH] split connection for streaming_pull --- pubsub/Cargo.toml | 2 +- pubsub/src/apiv1/subscriber_client.rs | 21 ++++++++++++++++----- pubsub/src/client.rs | 8 +++++++- pubsub/src/subscriber.rs | 2 +- pubsub/src/subscription.rs | 20 ++++++++++++++------ pubsub/src/topic.rs | 5 ++++- 6 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pubsub/Cargo.toml b/pubsub/Cargo.toml index 5f986314..5c42f5b0 100644 --- a/pubsub/Cargo.toml +++ b/pubsub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "google-cloud-pubsub" -version = "0.28.0" +version = "0.28.1" authors = ["yoshidan "] edition = "2021" repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/pubsub" diff --git a/pubsub/src/apiv1/subscriber_client.rs b/pubsub/src/apiv1/subscriber_client.rs index f1189310..9b70d496 100644 --- a/pubsub/src/apiv1/subscriber_client.rs +++ b/pubsub/src/apiv1/subscriber_client.rs @@ -33,13 +33,17 @@ pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest { #[derive(Clone, Debug)] pub struct SubscriberClient { cm: Arc, + streaming_pull_cm: Arc, } #[allow(dead_code)] impl SubscriberClient { /// create new Subscriber client - pub fn new(cm: ConnectionManager) -> SubscriberClient { - SubscriberClient { cm: Arc::new(cm) } + pub fn new(cm: ConnectionManager, streaming_pull_cm: ConnectionManager) -> SubscriberClient { + SubscriberClient { + cm: Arc::new(cm), + streaming_pull_cm: Arc::new(streaming_pull_cm), + } } #[inline] @@ -49,8 +53,15 @@ impl SubscriberClient { .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT) } - pub(crate) fn pool_size(&self) -> usize { - self.cm.num() + #[inline] + fn client_for_streaming_pull(&self) -> InternalSubscriberClient { + InternalSubscriberClient::new(self.streaming_pull_cm.conn()) + .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT) + .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT) + } + + pub(crate) fn streaming_pool_size(&self) -> usize { + self.streaming_pull_cm.num() } /// create_subscription creates a subscription to a given topic. See the [resource name rules] @@ -231,7 +242,7 @@ impl SubscriberClient { retry: Option, ) -> Result>, Status> { let action = || async { - let mut client = self.client(); + let mut client = self.client_for_streaming_pull(); let base_req = req.clone(); let rx = ping_receiver.clone(); let request = Box::pin(async_stream::stream! { diff --git a/pubsub/src/client.rs b/pubsub/src/client.rs index c0cace61..5cc7303b 100644 --- a/pubsub/src/client.rs +++ b/pubsub/src/client.rs @@ -126,8 +126,14 @@ impl Client { &config.connection_option, ) .await?, + ConnectionManager::new( + pool_size, + config.endpoint.as_str(), + &config.environment, + &config.connection_option, + ) + .await?, ); - Ok(Self { project_id: config.project_id.ok_or(Error::ProjectIdNotFound)?, pubc, diff --git a/pubsub/src/subscriber.rs b/pubsub/src/subscriber.rs index 390288c5..5a63c5dd 100644 --- a/pubsub/src/subscriber.rs +++ b/pubsub/src/subscriber.rs @@ -362,7 +362,7 @@ mod tests { .await .unwrap() }; - let subc = SubscriberClient::new(cm().await); + let subc = SubscriberClient::new(cm().await, cm().await); let pubc = PublisherClient::new(cm().await); pubc.publish( diff --git a/pubsub/src/subscription.rs b/pubsub/src/subscription.rs index e256a0d2..f0424ab3 100644 --- a/pubsub/src/subscription.rs +++ b/pubsub/src/subscription.rs @@ -211,8 +211,8 @@ impl Subscription { Self { fqsn, subc } } - pub(crate) fn pool_size(&self) -> usize { - self.subc.pool_size() + pub(crate) fn streaming_pool_size(&self) -> usize { + self.subc.streaming_pool_size() } /// id returns the unique identifier of the subscription within its project. @@ -459,7 +459,7 @@ impl Subscription { // spawn a separate subscriber task for each connection in the pool let subscribers = if opt.enable_multiple_subscriber { - self.pool_size() + self.streaming_pool_size() } else { 1 }; @@ -649,8 +649,8 @@ impl Subscription { /// - The message backlog on the subscription -- or to be specific, messages that are unacknowledged /// at the time of the subscription's creation. /// - All messages published to the subscription's topic after the snapshot's creation. - /// Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which - /// they are discarded and any messages being retained solely due to the snapshot dropped. + /// Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which + /// they are discarded and any messages being retained solely due to the snapshot dropped. pub async fn create_snapshot( &self, name: &str, @@ -740,7 +740,15 @@ mod tests { ) .await .unwrap(); - let client = SubscriberClient::new(cm); + let cm2 = ConnectionManager::new( + 4, + "", + &Environment::Emulator(EMULATOR.to_string()), + &ConnectionOptions::default(), + ) + .await + .unwrap(); + let client = SubscriberClient::new(cm, cm2); let uuid = Uuid::new_v4().hyphenated().to_string(); let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid); diff --git a/pubsub/src/topic.rs b/pubsub/src/topic.rs index 36757028..55594103 100644 --- a/pubsub/src/topic.rs +++ b/pubsub/src/topic.rs @@ -164,7 +164,10 @@ mod tests { let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default()) .await .unwrap(); - let subc = SubscriberClient::new(cm2); + let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default()) + .await + .unwrap(); + let subc = SubscriberClient::new(cm2, cm3); let uuid = Uuid::new_v4().hyphenated().to_string(); let topic_name = format!("projects/local-project/topics/t{uuid}");