From 44844929f7140925349b28d1ed25cd7643fd7669 Mon Sep 17 00:00:00 2001 From: Naohiro Yoshida Date: Mon, 5 Feb 2024 09:40:06 +0900 Subject: [PATCH] [pubsub] Fix ordering key error (#227) --- .github/workflows/ci.yaml | 9 ++ pubsub/Cargo.toml | 2 +- pubsub/src/apiv1/subscriber_client.rs | 2 +- pubsub/src/client.rs | 124 +++++++++++++++++++++++++- pubsub/src/publisher.rs | 103 +++++++++++++++++++-- pubsub/src/subscription.rs | 5 +- 6 files changed, 230 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 35f8aea5..f025f62a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -74,6 +74,15 @@ jobs: PUBSUB_EMULATOR_HOST: localhost:8681 RUSTFLAGS: "-A dead_code -A unused" run: cargo test --release --all-features --manifest-path pubsub/Cargo.toml + - name: Setup gcloud + uses: google-github-actions/setup-gcloud@v0.6.0 + with: + service_account_key: ${{ secrets.STORAGE_CREDENTIALS }} + export_default_credentials: true + - name: test_in_gcp + env: + RUSTFLAGS: "-A dead_code -A unused" + run: cargo test --release --all-features --manifest-path pubsub/Cargo.toml -- --ignored spanner: name: spanner runs-on: ubuntu-latest diff --git a/pubsub/Cargo.toml b/pubsub/Cargo.toml index dbfd0dbf..be660a25 100644 --- a/pubsub/Cargo.toml +++ b/pubsub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "google-cloud-pubsub" -version = "0.22.0" +version = "0.22.1" authors = ["yoshidan "] edition = "2021" repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/pubsub" diff --git a/pubsub/src/apiv1/subscriber_client.rs b/pubsub/src/apiv1/subscriber_client.rs index 4e801467..f1189310 100644 --- a/pubsub/src/apiv1/subscriber_client.rs +++ b/pubsub/src/apiv1/subscriber_client.rs @@ -31,7 +31,7 @@ pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest { } #[derive(Clone, Debug)] -pub(crate) struct SubscriberClient { +pub struct SubscriberClient { cm: Arc, } diff --git a/pubsub/src/client.rs b/pubsub/src/client.rs index 0259df9e..ff8c5c52 100644 --- a/pubsub/src/client.rs +++ b/pubsub/src/client.rs @@ -280,14 +280,13 @@ mod tests { use std::thread; use std::time::Duration; - use google_cloud_gax::conn::Environment; use serial_test::serial; use tokio_util::sync::CancellationToken; use uuid::Uuid; use google_cloud_googleapis::pubsub::v1::PubsubMessage; - use crate::client::{Client, ClientConfig}; + use crate::client::Client; use crate::subscriber::SubscriberConfig; use crate::subscription::{ReceiveConfig, SubscriptionConfig}; @@ -455,12 +454,131 @@ mod tests { assert_eq!(1, subs_after.len() - subs.len()); assert_eq!(1, snapshots_after.len() - snapshots.len()); } +} + +#[cfg(test)] +mod tests_in_gcp { + use crate::client::{Client, ClientConfig}; + use crate::publisher::PublisherConfig; + use google_cloud_gax::conn::Environment; + use google_cloud_googleapis::pubsub::v1::PubsubMessage; + use serial_test::serial; + use std::time::Duration; + + fn make_msg(key: &str) -> PubsubMessage { + PubsubMessage { + data: if key.is_empty() { + "empty".into() + } else { + key.to_string().into() + }, + ordering_key: key.into(), + ..Default::default() + } + } #[tokio::test] + #[ignore] async fn test_with_auth() { let config = ClientConfig::default().with_auth().await.unwrap(); - if let Environment::GoogleCloud(_) = config.environment { + if let Environment::Emulator(_) = config.environment { unreachable!() } } + + #[tokio::test] + #[serial] + #[ignore] + async fn test_publish_ordering_in_gcp_flush_buffer() { + let client = Client::new(ClientConfig::default().with_auth().await.unwrap()) + .await + .unwrap(); + let topic = client.topic("test-topic2"); + let publisher = topic.new_publisher(Some(PublisherConfig { + flush_interval: Duration::from_secs(3), + workers: 3, + ..Default::default() + })); + + let mut awaiters = vec![]; + for key in ["", "key1", "key2", "key3", "key3"] { + awaiters.push(publisher.publish(make_msg(key)).await); + } + for awaiter in awaiters.into_iter() { + tracing::info!("msg id {}", awaiter.get().await.unwrap()); + } + + // check same key + let mut awaiters = vec![]; + for key in ["", "key1", "key2", "key3", "key3"] { + awaiters.push(publisher.publish(make_msg(key)).await); + } + for awaiter in awaiters.into_iter() { + tracing::info!("msg id {}", awaiter.get().await.unwrap()); + } + } + + #[tokio::test] + #[serial] + #[ignore] + async fn test_publish_ordering_in_gcp_limit_exceed() { + let client = Client::new(ClientConfig::default().with_auth().await.unwrap()) + .await + .unwrap(); + let topic = client.topic("test-topic2"); + let publisher = topic.new_publisher(Some(PublisherConfig { + flush_interval: Duration::from_secs(30), + workers: 1, + bundle_size: 8, + ..Default::default() + })); + + let mut awaiters = vec![]; + for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] { + awaiters.push(publisher.publish(make_msg(key)).await); + } + for awaiter in awaiters.into_iter() { + tracing::info!("msg id {}", awaiter.get().await.unwrap()); + } + + // check same key twice + let mut awaiters = vec![]; + for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] { + awaiters.push(publisher.publish(make_msg(key)).await); + } + for awaiter in awaiters.into_iter() { + tracing::info!("msg id {}", awaiter.get().await.unwrap()); + } + } + + #[tokio::test] + #[serial] + #[ignore] + async fn test_publish_ordering_in_gcp_bulk() { + let client = Client::new(ClientConfig::default().with_auth().await.unwrap()) + .await + .unwrap(); + let topic = client.topic("test-topic2"); + let publisher = topic.new_publisher(Some(PublisherConfig { + flush_interval: Duration::from_secs(30), + workers: 2, + bundle_size: 8, + ..Default::default() + })); + + let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"] + .map(make_msg) + .to_vec(); + for awaiter in publisher.publish_bulk(msgs).await.into_iter() { + tracing::info!("msg id {}", awaiter.get().await.unwrap()); + } + + // check same key twice + let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"] + .map(make_msg) + .to_vec(); + for awaiter in publisher.publish_bulk(msgs).await.into_iter() { + tracing::info!("msg id {}", awaiter.get().await.unwrap()); + } + } } diff --git a/pubsub/src/publisher.rs b/pubsub/src/publisher.rs index 4e351438..00daa9a1 100644 --- a/pubsub/src/publisher.rs +++ b/pubsub/src/publisher.rs @@ -1,4 +1,6 @@ use std::collections::HashMap; +use std::ops::{Deref, DerefMut}; + use std::sync::Arc; use std::time::Duration; @@ -222,7 +224,8 @@ impl Tasks { bundle_size: usize, ) -> JoinHandle<()> { tokio::spawn(async move { - let mut bundle = Vec::::with_capacity(bundle_size); + //TODO enable manage task by ordering_key + let mut bundle = MessageBundle::new(); while !receiver.is_closed() { let result = match timeout(flush_interval, &mut receiver.recv()).await { Ok(result) => result, @@ -230,8 +233,10 @@ impl Tasks { Err(_e) => { if !bundle.is_empty() { tracing::trace!("elapsed: flush buffer : {}", topic); - Self::flush(&mut client, topic.as_str(), bundle, retry.clone()).await; - bundle = Vec::new(); + for value in bundle.key_by() { + Self::flush(&mut client, topic.as_str(), value, retry.clone()).await; + } + bundle = MessageBundle::new(); } continue; } @@ -243,9 +248,11 @@ impl Tasks { Reserved::Multi(messages) => bundle.extend(messages), } if bundle.len() >= bundle_size { - tracing::trace!("maximum buffer {} : {}", bundle.len(), topic); - Self::flush(&mut client, topic.as_str(), bundle, retry.clone()).await; - bundle = Vec::new(); + tracing::trace!("bundle size max: {}", topic); + for value in bundle.key_by() { + Self::flush(&mut client, topic.as_str(), value, retry.clone()).await; + } + bundle = MessageBundle::new(); } } //closed @@ -256,7 +263,9 @@ impl Tasks { tracing::trace!("stop publisher : {}", topic); if !bundle.is_empty() { tracing::trace!("flush rest buffer : {}", topic); - Self::flush(&mut client, topic.as_str(), bundle, retry.clone()).await; + for value in bundle.key_by() { + Self::flush(&mut client, topic.as_str(), value, retry.clone()).await; + } } }) } @@ -314,3 +323,83 @@ impl Tasks { } } } + +struct MessageBundle { + inner: Vec, +} + +impl MessageBundle { + fn new() -> Self { + Self { inner: vec![] } + } + + fn key_by(self) -> Vec> { + let mut values = HashMap::>::new(); + for v in self.inner { + let key = v.message.ordering_key.to_string(); + match values.get_mut(&key) { + Some(e) => { + e.push(v); + } + None => { + values.insert(key, vec![v]); + } + } + } + let mut result = Vec::with_capacity(values.len()); + for (_, v) in values.into_iter() { + result.push(v); + } + result + } +} + +impl Deref for MessageBundle { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for MessageBundle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +#[cfg(test)] +mod tests { + use crate::publisher::{MessageBundle, ReservedMessage}; + use google_cloud_googleapis::pubsub::v1::PubsubMessage; + use tokio::sync::oneshot; + + fn msg(key: &str) -> ReservedMessage { + let (sender, _) = oneshot::channel(); + ReservedMessage { + producer: sender, + message: PubsubMessage { + ordering_key: key.to_string(), + ..Default::default() + }, + } + } + + #[test] + fn test_message_bundle_key_by() { + let mut bundle = MessageBundle::new(); + for key in ["", "a", "b", "c", "A", "", "D", "a"] { + bundle.push(msg(key)); + } + let msgs = bundle.key_by(); + assert_eq!(6, msgs.len()); + for msg in msgs { + let key = msg.first().unwrap().message.ordering_key.clone(); + if key == "a" || key.is_empty() { + assert_eq!(2, msg.len()); + } else { + assert_eq!(1, msg.len()); + } + } + } +} diff --git a/pubsub/src/subscription.rs b/pubsub/src/subscription.rs index 4f83d749..b9031af6 100644 --- a/pubsub/src/subscription.rs +++ b/pubsub/src/subscription.rs @@ -201,7 +201,6 @@ impl Subscription { parts.join("/") } - #[allow(private_interfaces)] pub fn get_client(&self) -> SubscriberClient { self.subc.clone() } @@ -1020,7 +1019,7 @@ mod tests { ack_all(&messages).await; assert_eq!(messages.len(), 1); - let message_publish_time = messages.get(0).unwrap().message.publish_time.to_owned().unwrap(); + let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap(); // rewind to a timestamp where message was just published subscription @@ -1032,7 +1031,7 @@ mod tests { let messages = subscription.pull(100, None).await.unwrap(); ack_all(&messages).await; assert_eq!(messages.len(), 1); - let seek_message_publish_time = messages.get(0).unwrap().message.publish_time.to_owned().unwrap(); + let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap(); assert_eq!(seek_message_publish_time, message_publish_time); // cleanup