diff --git a/nflog/nflog.go b/nflog/nflog.go index 1b65fb1b46..d879118584 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -226,7 +226,10 @@ func (s state) clone() state { // merge returns true or false whether the MeshEntry was merged or // not. This information is used to decide to gossip the message further. -func (s state) merge(e *pb.MeshEntry) bool { +func (s state) merge(e *pb.MeshEntry, now time.Time) bool { + if e.ExpiresAt.Before(now) { + return false + } k := stateKey(string(e.Entry.GroupKey), e.Entry.Receiver) prev, ok := s[k] @@ -411,7 +414,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui if err != nil { return err } - l.st.merge(e) + l.st.merge(e, l.now()) l.broadcast(b) return nil @@ -522,9 +525,10 @@ func (l *Log) Merge(b []byte) error { } l.mtx.Lock() defer l.mtx.Unlock() + now := l.now() for _, e := range st { - if merged := l.st.merge(e); merged && !cluster.OversizedMessage(b) { + if merged := l.st.merge(e, now); merged && !cluster.OversizedMessage(b) { // If this is the first we've seen the message and it's // not oversized, gossip it to other nodes. We don't // propagate oversized messages because they're sent to diff --git a/nflog/nflog_test.go b/nflog/nflog_test.go index 417af1a8f1..bb1579da17 100644 --- a/nflog/nflog_test.go +++ b/nflog/nflog_test.go @@ -156,7 +156,7 @@ func TestStateMerge(t *testing.T) { // We only care about key names and timestamps for the // merging logic. - newEntry := func(ts time.Time, name string) *pb.MeshEntry { + newEntry := func(name string, ts, exp time.Time) *pb.MeshEntry { return &pb.MeshEntry{ Entry: &pb.Entry{ Timestamp: ts, @@ -167,28 +167,33 @@ func TestStateMerge(t *testing.T) { Integration: "integr", }, }, + ExpiresAt: exp, } } + + exp := now.Add(time.Minute) + cases := []struct { a, b state final state }{ { a: state{ - "key:a1/integr/1": newEntry(now, "a1"), - "key:a2/integr/1": newEntry(now, "a2"), - "key:a3/integr/1": newEntry(now, "a3"), + "key:a1/integr/1": newEntry("a1", now, exp), + "key:a2/integr/1": newEntry("a2", now, exp), + "key:a3/integr/1": newEntry("a3", now, exp), }, b: state{ - "key:b1/integr/1": newEntry(now, "b1"), // new key, should be added - "key:a2/integr/1": newEntry(now.Add(-time.Minute), "a2"), // older timestamp, should be dropped - "key:a3/integr/1": newEntry(now.Add(time.Minute), "a3"), // newer timestamp, should overwrite + "key:b1/integr/1": newEntry("b1", now, exp), // new key, should be added + "key:b2/integr/1": newEntry("b2", now.Add(-time.Minute), now.Add(-time.Millisecond)), // new key, expired, should not be added + "key:a2/integr/1": newEntry("a2", now.Add(-time.Minute), exp), // older timestamp, should be dropped + "key:a3/integr/1": newEntry("a3", now.Add(time.Minute), exp), // newer timestamp, should overwrite }, final: state{ - "key:a1/integr/1": newEntry(now, "a1"), - "key:a2/integr/1": newEntry(now, "a2"), - "key:a3/integr/1": newEntry(now.Add(time.Minute), "a3"), - "key:b1/integr/1": newEntry(now, "b1"), + "key:a1/integr/1": newEntry("a1", now, exp), + "key:a2/integr/1": newEntry("a2", now, exp), + "key:a3/integr/1": newEntry("a3", now.Add(time.Minute), exp), + "key:b1/integr/1": newEntry("b1", now, exp), }, }, } @@ -198,7 +203,7 @@ func TestStateMerge(t *testing.T) { res := c.a.clone() for _, e := range cb { - res.merge(e) + res.merge(e, now) } require.Equal(t, c.final, res, "Merge result should match expectation") require.Equal(t, c.b, cb, "Merged state should remain unmodified") @@ -264,7 +269,7 @@ func TestStateDataCoding(t *testing.T) { } func TestQuery(t *testing.T) { - nl, err := New() + nl, err := New(WithRetention(time.Second)) if err != nil { require.NoError(t, err, "constructing nflog failed") } diff --git a/silence/silence.go b/silence/silence.go index 7940ae1c45..4b44817082 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -403,7 +403,7 @@ func (s *Silences) setSilence(sil *pb.Silence) error { return err } - s.st.merge(msil) + s.st.merge(msil, s.now()) s.broadcast(b) return nil @@ -717,8 +717,10 @@ func (s *Silences) Merge(b []byte) error { s.mtx.Lock() defer s.mtx.Unlock() + now := s.now() + for _, e := range st { - if merged := s.st.merge(e); merged && !cluster.OversizedMessage(b) { + if merged := s.st.merge(e, now); merged && !cluster.OversizedMessage(b) { // If this is the first we've seen the message and it's // not oversized, gossip it to other nodes. We don't // propagate oversized messages because they're sent to @@ -739,7 +741,10 @@ func (s *Silences) SetBroadcast(f func([]byte)) { type state map[string]*pb.MeshSilence -func (s state) merge(e *pb.MeshSilence) bool { +func (s state) merge(e *pb.MeshSilence, now time.Time) bool { + if e.ExpiresAt.Before(now) { + return false + } // Comments list was moved to a single comment. Apply upgrade // on silences received from peers. if len(e.Silence.Comments) > 0 { diff --git a/silence/silence_test.go b/silence/silence_test.go index febdaf0c25..e27f493a00 100644 --- a/silence/silence_test.go +++ b/silence/silence_test.go @@ -988,39 +988,43 @@ func TestStateMerge(t *testing.T) { // We only care about key names and timestamps for the // merging logic. - newSilence := func(id string, ts time.Time) *pb.MeshSilence { + newSilence := func(id string, ts, exp time.Time) *pb.MeshSilence { return &pb.MeshSilence{ - Silence: &pb.Silence{Id: id, UpdatedAt: ts}, + Silence: &pb.Silence{Id: id, UpdatedAt: ts}, + ExpiresAt: exp, } } + exp := now.Add(time.Minute) + cases := []struct { a, b state final state }{ { a: state{ - "a1": newSilence("a1", now), - "a2": newSilence("a2", now), - "a3": newSilence("a3", now), + "a1": newSilence("a1", now, exp), + "a2": newSilence("a2", now, exp), + "a3": newSilence("a3", now, exp), }, b: state{ - "b1": newSilence("b1", now), // new key, should be added - "a2": newSilence("a2", now.Add(-time.Minute)), // older timestamp, should be dropped - "a3": newSilence("a3", now.Add(time.Minute)), // newer timestamp, should overwrite + "b1": newSilence("b1", now, exp), // new key, should be added + "a2": newSilence("a2", now.Add(-time.Minute), exp), // older timestamp, should be dropped + "a3": newSilence("a3", now.Add(time.Minute), exp), // newer timestamp, should overwrite + "a4": newSilence("a4", now.Add(-time.Minute), now.Add(-time.Millisecond)), // new key, expired, should not be added }, final: state{ - "a1": newSilence("a1", now), - "a2": newSilence("a2", now), - "a3": newSilence("a3", now.Add(time.Minute)), - "b1": newSilence("b1", now), + "a1": newSilence("a1", now, exp), + "a2": newSilence("a2", now, exp), + "a3": newSilence("a3", now.Add(time.Minute), exp), + "b1": newSilence("b1", now, exp), }, }, } for _, c := range cases { for _, e := range c.b { - c.a.merge(e) + c.a.merge(e, now) } require.Equal(t, c.final, c.a, "Merge result should match expectation")