diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index 5cf9c9fdf2d7c..2d508a2c51af4 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -1,11 +1,15 @@ package retention import ( + "fmt" "time" + "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/grafana/loki/pkg/validation" ) @@ -18,8 +22,8 @@ type ExpirationChecker interface { } type expirationChecker struct { - tenantsRetention *TenantsRetention - earliestRetentionStartTime model.Time + tenantsRetention *TenantsRetention + latestRetentionStartTime model.Time } type Limits interface { @@ -43,14 +47,16 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod } func (e *expirationChecker) MarkPhaseStarted() { - e.earliestRetentionStartTime = model.Now().Add(-findHighestRetentionPeriod(e.tenantsRetention.limits)) + smallestRetentionPeriod := findSmallestRetentionPeriod(e.tenantsRetention.limits) + e.latestRetentionStartTime = model.Now().Add(-smallestRetentionPeriod) + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("smallest retention period %v", smallestRetentionPeriod)) } func (e *expirationChecker) MarkPhaseFailed() {} func (e *expirationChecker) MarkPhaseFinished() {} func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool { - return interval.Start.Before(e.earliestRetentionStartTime) + return interval.Start.Before(e.latestRetentionStartTime) } type TenantsRetention struct { @@ -97,26 +103,26 @@ Outer: return globalRetention } -func findHighestRetentionPeriod(limits Limits) time.Duration { +func findSmallestRetentionPeriod(limits Limits) time.Duration { defaultLimits := limits.DefaultLimits() - highestRetentionPeriod := defaultLimits.RetentionPeriod + smallestRetentionPeriod := defaultLimits.RetentionPeriod for _, streamRetention := range defaultLimits.StreamRetention { - if streamRetention.Period > highestRetentionPeriod { - highestRetentionPeriod = streamRetention.Period + if streamRetention.Period < smallestRetentionPeriod { + smallestRetentionPeriod = streamRetention.Period } } limits.ForEachTenantLimit(func(userID string, limit *validation.Limits) { - if limit.RetentionPeriod > highestRetentionPeriod { - highestRetentionPeriod = limit.RetentionPeriod + if limit.RetentionPeriod < smallestRetentionPeriod { + smallestRetentionPeriod = limit.RetentionPeriod } for _, streamRetention := range limit.StreamRetention { - if streamRetention.Period > highestRetentionPeriod { - highestRetentionPeriod = streamRetention.Period + if streamRetention.Period < smallestRetentionPeriod { + smallestRetentionPeriod = streamRetention.Period } } }) - return time.Duration(highestRetentionPeriod) + return time.Duration(smallestRetentionPeriod) } diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go index d86b602e1a84e..a615ec2b34e33 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -86,7 +86,7 @@ func Test_expirationChecker_Expired(t *testing.T) { } } -func TestFindEarliestRetentionStartTime(t *testing.T) { +func TestFindLatestRetentionStartTime(t *testing.T) { const dayDuration = 24 * time.Hour for _, tc := range []struct { name string @@ -103,43 +103,43 @@ func TestFindEarliestRetentionStartTime(t *testing.T) { expectedEarliestRetentionStartTime: 7 * dayDuration, }, { - name: "default retention period highest", + name: "default retention period smallest", limit: fakeLimits{ defaultLimit: retentionLimit{ retentionPeriod: 7 * dayDuration, streamRetention: []validation.StreamRetention{ { - Period: model.Duration(dayDuration), + Period: model.Duration(10 * dayDuration), }, }, }, perTenant: map[string]retentionLimit{ - "0": {retentionPeriod: 2 * dayDuration}, - "1": {retentionPeriod: 5 * dayDuration}, + "0": {retentionPeriod: 12 * dayDuration}, + "1": {retentionPeriod: 15 * dayDuration}, }, }, expectedEarliestRetentionStartTime: 7 * dayDuration, }, { - name: "default stream retention period highest", + name: "default stream retention period smallest", limit: fakeLimits{ defaultLimit: retentionLimit{ retentionPeriod: 7 * dayDuration, streamRetention: []validation.StreamRetention{ { - Period: model.Duration(10 * dayDuration), + Period: model.Duration(3 * dayDuration), }, }, }, perTenant: map[string]retentionLimit{ - "0": {retentionPeriod: 2 * dayDuration}, + "0": {retentionPeriod: 7 * dayDuration}, "1": {retentionPeriod: 5 * dayDuration}, }, }, - expectedEarliestRetentionStartTime: 10 * dayDuration, + expectedEarliestRetentionStartTime: 3 * dayDuration, }, { - name: "user retention retention period highest", + name: "user retention retention period smallest", limit: fakeLimits{ defaultLimit: retentionLimit{ retentionPeriod: 7 * dayDuration, @@ -168,10 +168,10 @@ func TestFindEarliestRetentionStartTime(t *testing.T) { }, }, }, - expectedEarliestRetentionStartTime: 20 * dayDuration, + expectedEarliestRetentionStartTime: 5 * dayDuration, }, { - name: "user stream retention period highest", + name: "user stream retention period smallest", limit: fakeLimits{ defaultLimit: retentionLimit{ retentionPeriod: 7 * dayDuration, @@ -191,20 +191,20 @@ func TestFindEarliestRetentionStartTime(t *testing.T) { }, }, "1": { - retentionPeriod: 5 * dayDuration, + retentionPeriod: 15 * dayDuration, streamRetention: []validation.StreamRetention{ { - Period: model.Duration(25 * dayDuration), + Period: model.Duration(2 * dayDuration), }, }, }, }, }, - expectedEarliestRetentionStartTime: 25 * dayDuration, + expectedEarliestRetentionStartTime: 2 * dayDuration, }, } { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedEarliestRetentionStartTime, findHighestRetentionPeriod(tc.limit)) + require.Equal(t, tc.expectedEarliestRetentionStartTime, findSmallestRetentionPeriod(tc.limit)) }) } } @@ -219,7 +219,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) { { name: "not expired", expirationChecker: expirationChecker{ - earliestRetentionStartTime: model.Now().Add(-24 * time.Hour), + latestRetentionStartTime: model.Now().Add(-24 * time.Hour), }, interval: model.Interval{ Start: model.Now().Add(-time.Hour), @@ -229,7 +229,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) { { name: "partially expired", expirationChecker: expirationChecker{ - earliestRetentionStartTime: model.Now().Add(-24 * time.Hour), + latestRetentionStartTime: model.Now().Add(-24 * time.Hour), }, interval: model.Interval{ Start: model.Now().Add(-25 * time.Hour), @@ -240,7 +240,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) { { name: "fully expired", expirationChecker: expirationChecker{ - earliestRetentionStartTime: model.Now().Add(-24 * time.Hour), + latestRetentionStartTime: model.Now().Add(-24 * time.Hour), }, interval: model.Interval{ Start: model.Now().Add(-26 * time.Hour),