From 973fef56e1e6fd6afc568ea3c1c2f654f7754d49 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 19:02:44 -0500 Subject: [PATCH] feat: expire messages from the cache based on last seen time (#513) * feat: expire messages from the cache based on last seen time * chore: minor renaming * fix: messages should not be found after expiration * chore: editorial * fix: use new time cache strategy consistently * fix: default to old time cache and add todo for background gc --- blacklist.go | 12 ++--- go.mod | 6 +-- go.sum | 4 +- pubsub.go | 24 +++++++-- timecache/first_seen_cache.go | 71 +++++++++++++++++++++++++ timecache/first_seen_cache_test.go | 39 ++++++++++++++ timecache/last_seen_cache.go | 84 ++++++++++++++++++++++++++++++ timecache/last_seen_cache_test.go | 84 ++++++++++++++++++++++++++++++ timecache/time_cache.go | 32 ++++++++++++ 9 files changed, 337 insertions(+), 19 deletions(-) create mode 100644 timecache/first_seen_cache.go create mode 100644 timecache/first_seen_cache_test.go create mode 100644 timecache/last_seen_cache.go create mode 100644 timecache/last_seen_cache_test.go create mode 100644 timecache/time_cache.go diff --git a/blacklist.go b/blacklist.go index ecaafe8c..2d9bf252 100644 --- a/blacklist.go +++ b/blacklist.go @@ -1,11 +1,11 @@ package pubsub import ( - "sync" "time" "github.com/libp2p/go-libp2p/core/peer" - "github.com/whyrusleeping/timecache" + + "github.com/libp2p/go-libp2p-pubsub/timecache" ) // Blacklist is an interface for peer blacklisting. @@ -34,8 +34,7 @@ func (b MapBlacklist) Contains(p peer.ID) bool { // TimeCachedBlacklist is a blacklist implementation using a time cache type TimeCachedBlacklist struct { - sync.RWMutex - tc *timecache.TimeCache + tc timecache.TimeCache } // NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration @@ -46,8 +45,6 @@ func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) { // Add returns a bool saying whether Add of peer was successful func (b *TimeCachedBlacklist) Add(p peer.ID) bool { - b.Lock() - defer b.Unlock() s := p.String() if b.tc.Has(s) { return false @@ -57,8 +54,5 @@ func (b *TimeCachedBlacklist) Add(p peer.ID) bool { } func (b *TimeCachedBlacklist) Contains(p peer.ID) bool { - b.RLock() - defer b.RUnlock() - return b.tc.Has(p.String()) } diff --git a/go.mod b/go.mod index 231497f1..c0695a55 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,15 @@ go 1.17 require ( github.com/benbjohnson/clock v1.3.0 + github.com/emirpasic/gods v1.18.1 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log v1.0.5 + github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-libp2p v0.22.0 github.com/libp2p/go-libp2p-testing v0.12.0 github.com/libp2p/go-msgio v0.2.0 github.com/multiformats/go-multiaddr v0.6.0 - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee + github.com/multiformats/go-varint v0.0.6 ) require ( @@ -36,7 +38,6 @@ require ( github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect - github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-netroute v0.2.0 // indirect github.com/libp2p/go-openssl v0.1.0 // indirect @@ -62,7 +63,6 @@ require ( github.com/multiformats/go-multicodec v0.5.0 // indirect github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-multistream v0.3.3 // indirect - github.com/multiformats/go-varint v0.0.6 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect diff --git a/go.sum b/go.sum index 84d10209..3b40bead 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -544,8 +546,6 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pubsub.go b/pubsub.go index b94a1ecd..9bddf138 100644 --- a/pubsub.go +++ b/pubsub.go @@ -11,6 +11,7 @@ import ( "time" pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/discovery" @@ -20,7 +21,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" logging "github.com/ipfs/go-log" - "github.com/whyrusleeping/timecache" ) // DefaultMaximumMessageSize is 1mb. @@ -31,6 +31,10 @@ var ( // Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default. TimeCacheDuration = 120 * time.Second + // TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache. + // Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default. + TimeCacheStrategy = timecache.Strategy_FirstSeen + // ErrSubscriptionCancelled may be returned when a subscription Next() is called after the // subscription has been cancelled. ErrSubscriptionCancelled = errors.New("subscription cancelled") @@ -148,9 +152,10 @@ type PubSub struct { inboundStreamsMx sync.Mutex inboundStreams map[peer.ID]network.Stream - seenMessagesMx sync.Mutex - seenMessages *timecache.TimeCache - seenMsgTTL time.Duration + seenMessagesMx sync.Mutex + seenMessages timecache.TimeCache + seenMsgTTL time.Duration + seenMsgStrategy timecache.Strategy // generator used to compute the ID for a message idGen *msgIDGenerator @@ -286,6 +291,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMsgTTL: TimeCacheDuration, + seenMsgStrategy: TimeCacheStrategy, idGen: newMsgIdGenerator(), counter: uint64(time.Now().UnixNano()), } @@ -307,7 +313,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option } } - ps.seenMessages = timecache.NewTimeCache(ps.seenMsgTTL) + ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL) if err := ps.disc.Start(ps); err != nil { return nil, err @@ -533,6 +539,14 @@ func WithSeenMessagesTTL(ttl time.Duration) Option { } } +// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache +func WithSeenMessagesStrategy(strategy timecache.Strategy) Option { + return func(ps *PubSub) error { + ps.seenMsgStrategy = strategy + return nil + } +} + // WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to // processing them. The inspector is invoked on an accepted RPC just before it // is handled. If inspector's error is nil, the RPC is handled. Otherwise, it diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go new file mode 100644 index 00000000..f8626aeb --- /dev/null +++ b/timecache/first_seen_cache.go @@ -0,0 +1,71 @@ +package timecache + +import ( + "container/list" + "sync" + "time" +) + +// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache. +type FirstSeenCache struct { + q *list.List + m map[string]time.Time + span time.Duration + guard *sync.RWMutex +} + +func newFirstSeenCache(span time.Duration) TimeCache { + return &FirstSeenCache{ + q: list.New(), + m: make(map[string]time.Time), + span: span, + guard: new(sync.RWMutex), + } +} + +func (tc FirstSeenCache) Add(s string) { + tc.guard.Lock() + defer tc.guard.Unlock() + + _, ok := tc.m[s] + if ok { + panic("putting the same entry twice not supported") + } + + // TODO(#515): Do GC in the background + tc.sweep() + + tc.m[s] = time.Now() + tc.q.PushFront(s) +} + +func (tc FirstSeenCache) sweep() { + for { + back := tc.q.Back() + if back == nil { + return + } + + v := back.Value.(string) + t, ok := tc.m[v] + if !ok { + panic("inconsistent cache state") + } + + if time.Since(t) > tc.span { + tc.q.Remove(back) + delete(tc.m, v) + } else { + return + } + } +} + +func (tc FirstSeenCache) Has(s string) bool { + tc.guard.RLock() + defer tc.guard.RUnlock() + + ts, ok := tc.m[s] + // Only consider the entry found if it was present in the cache AND hadn't already expired. + return ok && time.Since(ts) <= tc.span +} diff --git a/timecache/first_seen_cache_test.go b/timecache/first_seen_cache_test.go new file mode 100644 index 00000000..adacf8a9 --- /dev/null +++ b/timecache/first_seen_cache_test.go @@ -0,0 +1,39 @@ +package timecache + +import ( + "fmt" + "testing" + "time" +) + +func TestFirstSeenCacheFound(t *testing.T) { + tc := newFirstSeenCache(time.Minute) + + tc.Add("test") + + if !tc.Has("test") { + t.Fatal("should have this key") + } +} + +func TestFirstSeenCacheExpire(t *testing.T) { + tc := newFirstSeenCache(time.Second) + for i := 0; i < 11; i++ { + tc.Add(fmt.Sprint(i)) + time.Sleep(time.Millisecond * 100) + } + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} + +func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) { + tc := newFirstSeenCache(time.Second) + tc.Add(fmt.Sprint(0)) + time.Sleep(1100 * time.Millisecond) + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} diff --git a/timecache/last_seen_cache.go b/timecache/last_seen_cache.go new file mode 100644 index 00000000..daaa629f --- /dev/null +++ b/timecache/last_seen_cache.go @@ -0,0 +1,84 @@ +package timecache + +import ( + "sync" + "time" + + "github.com/emirpasic/gods/maps/linkedhashmap" +) + +// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed, +// "old" entries will be purged from the cache. +// +// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This +// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network +// issues that might increase the number of duplicate messages in the network. +// +// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the +// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one +// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache. +// +// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache. +type LastSeenCache struct { + m *linkedhashmap.Map + span time.Duration + guard *sync.Mutex +} + +func newLastSeenCache(span time.Duration) TimeCache { + return &LastSeenCache{ + m: linkedhashmap.New(), + span: span, + guard: new(sync.Mutex), + } +} + +func (tc *LastSeenCache) Add(s string) { + tc.guard.Lock() + defer tc.guard.Unlock() + + tc.add(s) + + // Garbage collect expired entries + // TODO(#515): Do GC in the background + tc.gc() +} + +func (tc *LastSeenCache) add(s string) { + // We don't need a lock here because this function is always called with the lock already acquired. + + // If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and + // an accurate sliding window. + tc.m.Remove(s) + now := time.Now() + tc.m.Put(s, &now) +} + +func (tc *LastSeenCache) gc() { + // We don't need a lock here because this function is always called with the lock already acquired. + iter := tc.m.Iterator() + for iter.Next() { + key := iter.Key() + ts := iter.Value().(*time.Time) + // Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all + // entries hereafter will be unexpired. + if time.Since(*ts) <= tc.span { + return + } + tc.m.Remove(key) + } +} + +func (tc *LastSeenCache) Has(s string) bool { + tc.guard.Lock() + defer tc.guard.Unlock() + + // If the entry exists and has not already expired, slide it forward. + if ts, found := tc.m.Get(s); found { + if t := ts.(*time.Time); time.Since(*t) <= tc.span { + tc.add(s) + return true + } + } + return false +} diff --git a/timecache/last_seen_cache_test.go b/timecache/last_seen_cache_test.go new file mode 100644 index 00000000..49d0c068 --- /dev/null +++ b/timecache/last_seen_cache_test.go @@ -0,0 +1,84 @@ +package timecache + +import ( + "fmt" + "testing" + "time" +) + +func TestLastSeenCacheFound(t *testing.T) { + tc := newLastSeenCache(time.Minute) + + tc.Add("test") + + if !tc.Has("test") { + t.Fatal("should have this key") + } +} + +func TestLastSeenCacheExpire(t *testing.T) { + tc := newLastSeenCache(time.Second) + for i := 0; i < 11; i++ { + tc.Add(fmt.Sprint(i)) + time.Sleep(time.Millisecond * 100) + } + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} + +func TestLastSeenCacheSlideForward(t *testing.T) { + tc := newLastSeenCache(time.Second) + i := 0 + + // T0ms: Add 8 entries with a 100ms sleep after each + for i < 8 { + tc.Add(fmt.Sprint(i)) + time.Sleep(time.Millisecond * 100) + i++ + } + + // T800ms: Lookup the first entry - this should slide the entry forward so that its expiration is a full second + // later. + if !tc.Has(fmt.Sprint(0)) { + t.Fatal("should have this key") + } + + // T800ms: Wait till after the first and second entries would have normally expired (had we not looked the first + // entry up). + time.Sleep(time.Millisecond * 400) + + // T1200ms: The first entry should still be present in the cache - this will also slide the entry forward. + if !tc.Has(fmt.Sprint(0)) { + t.Fatal("should still have this key") + } + + // T1200ms: The second entry should have expired + if tc.Has(fmt.Sprint(1)) { + t.Fatal("should have dropped this from the cache already") + } + + // T1200ms: Sleep till the first entry actually expires + time.Sleep(time.Millisecond * 1100) + + // T2300ms: Now the first entry should have expired + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } + + // And it should not have been added back + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} + +func TestLastSeenCacheNotFoundAfterExpire(t *testing.T) { + tc := newLastSeenCache(time.Second) + tc.Add(fmt.Sprint(0)) + time.Sleep(1100 * time.Millisecond) + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} diff --git a/timecache/time_cache.go b/timecache/time_cache.go new file mode 100644 index 00000000..f10ff214 --- /dev/null +++ b/timecache/time_cache.go @@ -0,0 +1,32 @@ +package timecache + +import "time" + +type Strategy uint8 + +const ( + Strategy_FirstSeen Strategy = iota + Strategy_LastSeen +) + +type TimeCache interface { + Add(string) + Has(string) bool +} + +// NewTimeCache defaults to the original ("first seen") cache implementation +func NewTimeCache(span time.Duration) TimeCache { + return NewTimeCacheWithStrategy(Strategy_FirstSeen, span) +} + +func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache { + switch strategy { + case Strategy_FirstSeen: + return newFirstSeenCache(span) + case Strategy_LastSeen: + return newLastSeenCache(span) + default: + // Default to the original time cache implementation + return newFirstSeenCache(span) + } +}