Skip to content

Commit

Permalink
fix finding tables which would have out of retention data (#4107)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Aug 6, 2021
1 parent c3aaebe commit 67fe2d6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 32 deletions.
32 changes: 19 additions & 13 deletions pkg/storage/stores/shipper/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package retention

import (
"fmt"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

util_log "github.com/cortexproject/cortex/pkg/util/log"

"github.com/grafana/loki/pkg/validation"
)

Expand All @@ -18,8 +22,8 @@ type ExpirationChecker interface {
}

type expirationChecker struct {
tenantsRetention *TenantsRetention
earliestRetentionStartTime model.Time
tenantsRetention *TenantsRetention
latestRetentionStartTime model.Time
}

type Limits interface {
Expand All @@ -43,14 +47,16 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod
}

func (e *expirationChecker) MarkPhaseStarted() {
e.earliestRetentionStartTime = model.Now().Add(-findHighestRetentionPeriod(e.tenantsRetention.limits))
smallestRetentionPeriod := findSmallestRetentionPeriod(e.tenantsRetention.limits)
e.latestRetentionStartTime = model.Now().Add(-smallestRetentionPeriod)
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("smallest retention period %v", smallestRetentionPeriod))
}

func (e *expirationChecker) MarkPhaseFailed() {}
func (e *expirationChecker) MarkPhaseFinished() {}

func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool {
return interval.Start.Before(e.earliestRetentionStartTime)
return interval.Start.Before(e.latestRetentionStartTime)
}

type TenantsRetention struct {
Expand Down Expand Up @@ -97,26 +103,26 @@ Outer:
return globalRetention
}

func findHighestRetentionPeriod(limits Limits) time.Duration {
func findSmallestRetentionPeriod(limits Limits) time.Duration {
defaultLimits := limits.DefaultLimits()

highestRetentionPeriod := defaultLimits.RetentionPeriod
smallestRetentionPeriod := defaultLimits.RetentionPeriod
for _, streamRetention := range defaultLimits.StreamRetention {
if streamRetention.Period > highestRetentionPeriod {
highestRetentionPeriod = streamRetention.Period
if streamRetention.Period < smallestRetentionPeriod {
smallestRetentionPeriod = streamRetention.Period
}
}

limits.ForEachTenantLimit(func(userID string, limit *validation.Limits) {
if limit.RetentionPeriod > highestRetentionPeriod {
highestRetentionPeriod = limit.RetentionPeriod
if limit.RetentionPeriod < smallestRetentionPeriod {
smallestRetentionPeriod = limit.RetentionPeriod
}
for _, streamRetention := range limit.StreamRetention {
if streamRetention.Period > highestRetentionPeriod {
highestRetentionPeriod = streamRetention.Period
if streamRetention.Period < smallestRetentionPeriod {
smallestRetentionPeriod = streamRetention.Period
}
}
})

return time.Duration(highestRetentionPeriod)
return time.Duration(smallestRetentionPeriod)
}
38 changes: 19 additions & 19 deletions pkg/storage/stores/shipper/compactor/retention/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Test_expirationChecker_Expired(t *testing.T) {
}
}

func TestFindEarliestRetentionStartTime(t *testing.T) {
func TestFindLatestRetentionStartTime(t *testing.T) {
const dayDuration = 24 * time.Hour
for _, tc := range []struct {
name string
Expand All @@ -103,43 +103,43 @@ func TestFindEarliestRetentionStartTime(t *testing.T) {
expectedEarliestRetentionStartTime: 7 * dayDuration,
},
{
name: "default retention period highest",
name: "default retention period smallest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(dayDuration),
Period: model.Duration(10 * dayDuration),
},
},
},
perTenant: map[string]retentionLimit{
"0": {retentionPeriod: 2 * dayDuration},
"1": {retentionPeriod: 5 * dayDuration},
"0": {retentionPeriod: 12 * dayDuration},
"1": {retentionPeriod: 15 * dayDuration},
},
},
expectedEarliestRetentionStartTime: 7 * dayDuration,
},
{
name: "default stream retention period highest",
name: "default stream retention period smallest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(10 * dayDuration),
Period: model.Duration(3 * dayDuration),
},
},
},
perTenant: map[string]retentionLimit{
"0": {retentionPeriod: 2 * dayDuration},
"0": {retentionPeriod: 7 * dayDuration},
"1": {retentionPeriod: 5 * dayDuration},
},
},
expectedEarliestRetentionStartTime: 10 * dayDuration,
expectedEarliestRetentionStartTime: 3 * dayDuration,
},
{
name: "user retention retention period highest",
name: "user retention retention period smallest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
Expand Down Expand Up @@ -168,10 +168,10 @@ func TestFindEarliestRetentionStartTime(t *testing.T) {
},
},
},
expectedEarliestRetentionStartTime: 20 * dayDuration,
expectedEarliestRetentionStartTime: 5 * dayDuration,
},
{
name: "user stream retention period highest",
name: "user stream retention period smallest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
Expand All @@ -191,20 +191,20 @@ func TestFindEarliestRetentionStartTime(t *testing.T) {
},
},
"1": {
retentionPeriod: 5 * dayDuration,
retentionPeriod: 15 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(25 * dayDuration),
Period: model.Duration(2 * dayDuration),
},
},
},
},
},
expectedEarliestRetentionStartTime: 25 * dayDuration,
expectedEarliestRetentionStartTime: 2 * dayDuration,
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedEarliestRetentionStartTime, findHighestRetentionPeriod(tc.limit))
require.Equal(t, tc.expectedEarliestRetentionStartTime, findSmallestRetentionPeriod(tc.limit))
})
}
}
Expand All @@ -219,7 +219,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) {
{
name: "not expired",
expirationChecker: expirationChecker{
earliestRetentionStartTime: model.Now().Add(-24 * time.Hour),
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
},
interval: model.Interval{
Start: model.Now().Add(-time.Hour),
Expand All @@ -229,7 +229,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) {
{
name: "partially expired",
expirationChecker: expirationChecker{
earliestRetentionStartTime: model.Now().Add(-24 * time.Hour),
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
},
interval: model.Interval{
Start: model.Now().Add(-25 * time.Hour),
Expand All @@ -240,7 +240,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) {
{
name: "fully expired",
expirationChecker: expirationChecker{
earliestRetentionStartTime: model.Now().Add(-24 * time.Hour),
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
},
interval: model.Interval{
Start: model.Now().Add(-26 * time.Hour),
Expand Down

0 comments on commit 67fe2d6

Please sign in to comment.