Skip to content

Commit

Permalink
feat: Pubsub.SeenMessagesStrategy (#9543)
Browse files Browse the repository at this point in the history
* feat: expire messages from the cache based on last seen time
* docs: Pubsub.SeenMessagesStrategy

Ref. libp2p/go-libp2p-pubsub#513

Co-authored-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
smrz2001 and lidel authored Jan 26, 2023
1 parent f20c980 commit 9652f24
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 53 deletions.
27 changes: 25 additions & 2 deletions config/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
package config

const (
// LastSeenMessagesStrategy is a strategy that calculates the TTL countdown
// based on the last time a Pubsub message is seen. This means that if a message
// is received and then seen again within the specified TTL window, it
// won't be emitted until the TTL countdown expires from the last time the
// message was seen.
LastSeenMessagesStrategy = "last-seen"

// FirstSeenMessagesStrategy is a strategy that calculates the TTL
// countdown based on the first time a Pubsub message is seen. This means that if
// a message is received and then seen again within the specified TTL
// window, it won't be emitted.
FirstSeenMessagesStrategy = "first-seen"

// DefaultSeenMessagesStrategy is the strategy that is used by default if
// no Pubsub.SeenMessagesStrategy is specified.
DefaultSeenMessagesStrategy = LastSeenMessagesStrategy
)

type PubsubConfig struct {
// Router can be either floodsub (legacy) or gossipsub (new and
// backwards compatible).
Expand All @@ -12,7 +31,11 @@ type PubsubConfig struct {
// Enable pubsub (--enable-pubsub-experiment)
Enabled Flag `json:",omitempty"`

// SeenMessagesTTL configures the duration after which a previously seen
// message ID can be forgotten about.
// SeenMessagesTTL is a value that controls the time window within which
// duplicate messages will be identified and won't be emitted.
SeenMessagesTTL *OptionalDuration `json:",omitempty"`

// SeenMessagesStrategy is a setting that determines how the time-to-live
// (TTL) countdown for deduplicating messages is calculated.
SeenMessagesStrategy *OptionalString `json:",omitempty"`
}
13 changes: 13 additions & 0 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipfs/go-log"
"github.com/ipfs/kubo/config"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/ipfs/kubo/core/node/libp2p"
Expand Down Expand Up @@ -66,6 +67,18 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)),
)

var seenMessagesStrategy timecache.Strategy
configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy)
switch configSeenMessagesStrategy {
case config.LastSeenMessagesStrategy:
seenMessagesStrategy = timecache.Strategy_LastSeen
case config.FirstSeenMessagesStrategy:
seenMessagesStrategy = timecache.Strategy_FirstSeen
default:
return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy))
}
pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy))

switch cfg.Pubsub.Router {
case "":
fallthrough
Expand Down
38 changes: 38 additions & 0 deletions docs/changelogs/v0.18.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,43 @@
# Kubo changelog v0.18

## v0.18.1

This release includes improvements around Pubsub message deduplication, and more.


<!-- TOC depthfrom:3 -->

- [Overview](#overview)
- [🔦 Highlights](#-highlights)
- [New default Pubsub.SeenMessagesStrategy](#new-default-pubsubseenmessagesstrategy)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)

<!-- /TOC -->

### 🔦 Highlights

#### New default `Pubsub.SeenMessagesStrategy`

A new optional [`Pubsub.SeenMessagesStrategy`](../config.md#pubsubseenmessagesstrategy) configuration option has been added.

This option allows you to choose between two different strategies for
deduplicating messages: `first-seen` and `last-seen`.

When unset, the default strategy is `last-seen`, which calculates the
time-to-live (TTL) countdown based on the last time a message is seen. This
means that if a message is received and then seen again within the specified
TTL window based on the last time it was seen, it won't be emitted.

If you prefer the old behavior, which calculates the TTL countdown based on the
first time a message is seen, you can set `Pubsub.SeenMessagesStrategy` to
`first-seen`.

### 📝 Changelog

### 👨‍👩‍👧‍👦 Contributors


## v0.18.0

### Overview
Expand Down
28 changes: 26 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ config file at runtime.
- [`Pubsub.Router`](#pubsubrouter)
- [`Pubsub.DisableSigning`](#pubsubdisablesigning)
- [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl)
- [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy)
- [`Peering`](#peering)
- [`Peering.Peers`](#peeringpeers)
- [`Reprovider`](#reprovider)
Expand Down Expand Up @@ -1206,8 +1207,8 @@ Type: `bool`

### `Pubsub.SeenMessagesTTL`

Configures the duration after which a previously seen Pubsub Message ID can be
forgotten about.
Controls the time window within which duplicate messages, identified by Message
ID, will be identified and won't be emitted again.

A smaller value for this parameter means that Pubsub messages in the cache will
be garbage collected sooner, which can result in a smaller cache. At the same
Expand All @@ -1223,6 +1224,29 @@ Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp

Type: `optionalDuration`

### `Pubsub.SeenMessagesStrategy`

Determines how the time-to-live (TTL) countdown for deduplicating Pubsub
messages is calculated.

The Pubsub seen messages cache is a LRU cache that keeps messages for up to a
specified time duration. After this duration has elapsed, expired messages will
be purged from the cache.

The `last-seen` cache is a sliding-window cache. Every time a message is seen
again with the SeenMessagesTTL duration, its timestamp slides forward. This
keeps frequently occurring messages cached and prevents them from being
continually propagated, especially because of issues that might increase the
number of duplicate messages in the network.

The `first-seen` cache will store new messages and purge them after the
SeenMessagesTTL duration, even if they are seen multiple times within this
duration.

Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub))

Type: `optionalString`

## `Peering`

Configures the peering subsystem. The peering subsystem configures Kubo to
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down Expand Up @@ -122,7 +123,7 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.20.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.8.2 // indirect
github.com/libp2p/go-libp2p-pubsub v0.8.3 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.6.0 // indirect
Expand Down Expand Up @@ -180,7 +181,6 @@ require (
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions docs/examples/kubo-as-a-library/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -774,8 +776,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I=
github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw=
github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww=
github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
Expand Down Expand Up @@ -1285,8 +1287,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
github.com/libp2p/go-libp2p-http v0.4.0
github.com/libp2p/go-libp2p-kad-dht v0.20.0
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.8.2
github.com/libp2p/go-libp2p-pubsub v0.8.3
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.6.0
Expand Down Expand Up @@ -132,6 +132,7 @@ require (
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down Expand Up @@ -223,7 +224,6 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20221220214510-0333c149dec0 // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -808,8 +810,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I=
github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw=
github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww=
github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
Expand Down Expand Up @@ -1344,8 +1346,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
Expand Down
Loading

0 comments on commit 9652f24

Please sign in to comment.