From 9652f24f6c1dce639444b8bb39caa0a9cd375870 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Thu, 26 Jan 2023 18:24:35 -0500 Subject: [PATCH] feat: Pubsub.SeenMessagesStrategy (#9543) * feat: expire messages from the cache based on last seen time * docs: Pubsub.SeenMessagesStrategy Ref. https://github.com/libp2p/go-libp2p-pubsub/pull/513 Co-authored-by: Marcin Rataj --- config/pubsub.go | 27 ++++- core/node/groups.go | 13 ++ docs/changelogs/v0.18.md | 38 ++++++ docs/config.md | 28 ++++- docs/examples/kubo-as-a-library/go.mod | 4 +- docs/examples/kubo-as-a-library/go.sum | 8 +- go.mod | 4 +- go.sum | 8 +- .../integration/pubsub_msg_seen_cache_test.go | 114 ++++++++++++------ 9 files changed, 191 insertions(+), 53 deletions(-) diff --git a/config/pubsub.go b/config/pubsub.go index ba80843005a..36f9a9881d5 100644 --- a/config/pubsub.go +++ b/config/pubsub.go @@ -1,5 +1,24 @@ package config +const ( + // LastSeenMessagesStrategy is a strategy that calculates the TTL countdown + // based on the last time a Pubsub message is seen. This means that if a message + // is received and then seen again within the specified TTL window, it + // won't be emitted until the TTL countdown expires from the last time the + // message was seen. + LastSeenMessagesStrategy = "last-seen" + + // FirstSeenMessagesStrategy is a strategy that calculates the TTL + // countdown based on the first time a Pubsub message is seen. This means that if + // a message is received and then seen again within the specified TTL + // window, it won't be emitted. + FirstSeenMessagesStrategy = "first-seen" + + // DefaultSeenMessagesStrategy is the strategy that is used by default if + // no Pubsub.SeenMessagesStrategy is specified. + DefaultSeenMessagesStrategy = LastSeenMessagesStrategy +) + type PubsubConfig struct { // Router can be either floodsub (legacy) or gossipsub (new and // backwards compatible). @@ -12,7 +31,11 @@ type PubsubConfig struct { // Enable pubsub (--enable-pubsub-experiment) Enabled Flag `json:",omitempty"` - // SeenMessagesTTL configures the duration after which a previously seen - // message ID can be forgotten about. + // SeenMessagesTTL is a value that controls the time window within which + // duplicate messages will be identified and won't be emitted. SeenMessagesTTL *OptionalDuration `json:",omitempty"` + + // SeenMessagesStrategy is a setting that determines how the time-to-live + // (TTL) countdown for deduplicating messages is calculated. + SeenMessagesStrategy *OptionalString `json:",omitempty"` } diff --git a/core/node/groups.go b/core/node/groups.go index aa650ddf5d0..e640feff1ab 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -11,6 +11,7 @@ import ( "github.com/ipfs/go-log" "github.com/ipfs/kubo/config" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" "github.com/ipfs/kubo/core/node/libp2p" @@ -66,6 +67,18 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), ) + var seenMessagesStrategy timecache.Strategy + configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy) + switch configSeenMessagesStrategy { + case config.LastSeenMessagesStrategy: + seenMessagesStrategy = timecache.Strategy_LastSeen + case config.FirstSeenMessagesStrategy: + seenMessagesStrategy = timecache.Strategy_FirstSeen + default: + return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy)) + } + pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy)) + switch cfg.Pubsub.Router { case "": fallthrough diff --git a/docs/changelogs/v0.18.md b/docs/changelogs/v0.18.md index e51c6d3b570..376658fd987 100644 --- a/docs/changelogs/v0.18.md +++ b/docs/changelogs/v0.18.md @@ -1,5 +1,43 @@ # Kubo changelog v0.18 +## v0.18.1 + +This release includes improvements around Pubsub message deduplication, and more. + + + + +- [Overview](#overview) +- [๐Ÿ”ฆ Highlights](#-highlights) + - [New default Pubsub.SeenMessagesStrategy](#new-default-pubsubseenmessagesstrategy) +- [๐Ÿ“ Changelog](#-changelog) +- [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) + + + +### ๐Ÿ”ฆ Highlights + +#### New default `Pubsub.SeenMessagesStrategy` + +A new optional [`Pubsub.SeenMessagesStrategy`](../config.md#pubsubseenmessagesstrategy) configuration option has been added. + +This option allows you to choose between two different strategies for +deduplicating messages: `first-seen` and `last-seen`. + +When unset, the default strategy is `last-seen`, which calculates the +time-to-live (TTL) countdown based on the last time a message is seen. This +means that if a message is received and then seen again within the specified +TTL window based on the last time it was seen, it won't be emitted. + +If you prefer the old behavior, which calculates the TTL countdown based on the +first time a message is seen, you can set `Pubsub.SeenMessagesStrategy` to +`first-seen`. + +### ๐Ÿ“ Changelog + +### ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors + + ## v0.18.0 ### Overview diff --git a/docs/config.md b/docs/config.md index da919d84450..94d8094af4b 100644 --- a/docs/config.md +++ b/docs/config.md @@ -100,6 +100,7 @@ config file at runtime. - [`Pubsub.Router`](#pubsubrouter) - [`Pubsub.DisableSigning`](#pubsubdisablesigning) - [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl) + - [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy) - [`Peering`](#peering) - [`Peering.Peers`](#peeringpeers) - [`Reprovider`](#reprovider) @@ -1206,8 +1207,8 @@ Type: `bool` ### `Pubsub.SeenMessagesTTL` -Configures the duration after which a previously seen Pubsub Message ID can be -forgotten about. +Controls the time window within which duplicate messages, identified by Message +ID, will be identified and won't be emitted again. A smaller value for this parameter means that Pubsub messages in the cache will be garbage collected sooner, which can result in a smaller cache. At the same @@ -1223,6 +1224,29 @@ Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp Type: `optionalDuration` +### `Pubsub.SeenMessagesStrategy` + +Determines how the time-to-live (TTL) countdown for deduplicating Pubsub +messages is calculated. + +The Pubsub seen messages cache is a LRU cache that keeps messages for up to a +specified time duration. After this duration has elapsed, expired messages will +be purged from the cache. + +The `last-seen` cache is a sliding-window cache. Every time a message is seen +again with the SeenMessagesTTL duration, its timestamp slides forward. This +keeps frequently occurring messages cached and prevents them from being +continually propagated, especially because of issues that might increase the +number of duplicate messages in the network. + +The `first-seen` cache will store new messages and purge them after the +SeenMessagesTTL duration, even if they are seen multiple times within this +duration. + +Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub)) + +Type: `optionalString` + ## `Peering` Configures the peering subsystem. The peering subsystem configures Kubo to diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 309f06aa9d0..66cc238d6b0 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -38,6 +38,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -122,7 +123,7 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-kad-dht v0.20.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect - github.com/libp2p/go-libp2p-pubsub v0.8.2 // indirect + github.com/libp2p/go-libp2p-pubsub v0.8.3 // indirect github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.6.0 // indirect @@ -180,7 +181,6 @@ require ( github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index e194ef92eff..073bf7072b5 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -198,6 +198,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -774,8 +776,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= -github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww= +github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1285,8 +1287,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= diff --git a/go.mod b/go.mod index 273c0151001..f0b992f3088 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,7 @@ require ( github.com/libp2p/go-libp2p-http v0.4.0 github.com/libp2p/go-libp2p-kad-dht v0.20.0 github.com/libp2p/go-libp2p-kbucket v0.5.0 - github.com/libp2p/go-libp2p-pubsub v0.8.2 + github.com/libp2p/go-libp2p-pubsub v0.8.3 github.com/libp2p/go-libp2p-pubsub-router v0.6.0 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.6.0 @@ -132,6 +132,7 @@ require ( github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/felixge/httpsnoop v1.0.2 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -223,7 +224,6 @@ require ( github.com/whyrusleeping/cbor-gen v0.0.0-20221220214510-0333c149dec0 // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect diff --git a/go.sum b/go.sum index ab1548f378f..057b7f21b8d 100644 --- a/go.sum +++ b/go.sum @@ -206,6 +206,8 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/ github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA= github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -808,8 +810,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= -github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww= +github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1344,8 +1346,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 394cda5b120..7495080893d 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" mock "github.com/ipfs/kubo/core/mock" @@ -76,7 +77,6 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error var bootstrapNode, consumerNode, producerNode *core.IpfsNode var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID - sendDupMsg := false mn := mocknet.New() bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration @@ -98,6 +98,12 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error t.Fatal(err) } + // Used for logging the timeline + startTime := time.Time{} + + // Used for overriding the message ID + sendMsgID := "" + // Set up the pubsub message ID generation override for the producer core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) { var pubsubOptions []pubsub.Option @@ -105,19 +111,23 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error pubsubOptions, pubsub.WithSeenMessagesTTL(ttl), pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string { - now := time.Now().Format(time.StampMilli) + now := time.Now() + if startTime.Second() == 0 { + startTime = now + } + timeElapsed := now.Sub(startTime).Seconds() msg := string(pmsg.Data) - var msgID string from, _ := peer.IDFromBytes(pmsg.From) - if (from == producerPeerID) && sendDupMsg { - msgID = "DupMsg" - t.Logf("sending [%s] with duplicate message ID at [%s]", msg, now) + var msgID string + if from == producerPeerID { + msgID = sendMsgID + t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgID, timeElapsed) } else { msgID = pubsub.DefaultMsgIdFn(pmsg) - t.Logf("sending [%s] with unique message ID at [%s]", msg, now) } return msgID }), + pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen), ) return append( info.FXOptions, @@ -165,8 +175,8 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error t.Fatal(err) } // Utility functions defined inline to include context in closure - now := func() string { - return time.Now().Format(time.StampMilli) + now := func() float64 { + return time.Since(startTime).Seconds() } ctr := 0 msgGen := func() string { @@ -188,57 +198,87 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error msg, err := consumerSubscription.Next(rxCtx) if shouldFind { if err != nil { - t.Logf("did not receive [%s] by [%s]", msgTxt, now()) + t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now()) t.Fatal(err) } - t.Logf("received [%s] at [%s]", string(msg.Data()), now()) + t.Logf("received [%s] at T%fs", string(msg.Data()), now()) if !bytes.Equal(msg.Data(), []byte(msgTxt)) { t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt) } } else { if err == nil { - t.Logf("received [%s] at [%s]", string(msg.Data()), now()) + t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now()) t.Fail() } - t.Logf("did not receive [%s] by [%s]", msgTxt, now()) + t.Logf("did not receive [%s] at T%fs", msgTxt, now()) } } - // Send message 1 with the message ID we're going to duplicate later - sendDupMsg = true + const MsgID1 = "MsgID1" + const MsgID2 = "MsgID2" + const MsgID3 = "MsgID3" + + // Send message 1 with the message ID we're going to duplicate + sentMsg1 := time.Now() + sendMsgID = MsgID1 msgTxt := produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) - // Send message 2 with the same message ID as before - sendDupMsg = true + // Send message 2 with a duplicate message ID + sendMsgID = MsgID1 msgTxt = produceMessage() - consumeMessage(msgTxt, false) // should NOT find message, because it got deduplicated (sent twice within the SeenMessagesTTL window) - - // Wait for seen cache TTL time to let seen cache entries time out - time.Sleep(ttl) + // Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window). + consumeMessage(msgTxt, false) // Send message 3 with a new message ID - // - // This extra step is necessary for testing the cache TTL because the PubSub code only garbage collects when a - // message ID was not already present in the cache. This means that message 2's cache entry, even though it has - // technically timed out, will still cause the message to be considered duplicate. When a message with a different - // ID passes through, it will be added to the cache and garbage collection will clean up message 2's entry. This is - // another bug in the pubsub/cache implementation that will be fixed once the code is refactored for this issue: - // https://github.com/libp2p/go-libp2p-pubsub/issues/502 - sendDupMsg = false + sendMsgID = MsgID2 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) + + // Wait till just before the SeenMessagesTTL window has passed since message 1 was sent + time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond))) + + // Send message 4 with a duplicate message ID + sendMsgID = MsgID1 + msgTxt = produceMessage() + // Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This + // time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since + // the default time cache now implements a sliding window algorithm. + consumeMessage(msgTxt, false) + + // Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding + // a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window + // starting at message 1 has expired. + sentMsg5 := time.Now() + sendMsgID = MsgID1 + msgTxt = produceMessage() + // Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window + // started). This time again, the expiration should get pushed out for another SeenMessagesTTL window. + consumeMessage(msgTxt, false) + + // Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window + sendMsgID = MsgID2 + msgTxt = produceMessage() + // Should find the message since last read > SeenMessagesTTL, so it looks like a new message. + consumeMessage(msgTxt, true) + + // Sleep for a full SeenMessagesTTL window to let cache entries time out + time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond))) - // Send message 4 with the same message ID as before - sendDupMsg = true + // Send message 7 with a duplicate message ID + sendMsgID = MsgID1 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message again (time since the last read > SeenMessagesTTL, so it looks like a new message). + // Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message. + consumeMessage(msgTxt, true) - // Send message 5 with a new message ID + // Send message 8 with a brand new message ID // // This step is not strictly necessary, but has been added for good measure. - sendDupMsg = false + sendMsgID = MsgID3 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) return nil }