Skip to content

Commit

Permalink
v12 chunk format handling in retention (#5254)
Browse files Browse the repository at this point in the history
Also add some tests and safety mechanism to avoid dropping whole table when we cant find any chunks
  • Loading branch information
sandeepsukhani authored Jan 27, 2022
1 parent 8646436 commit 9fdcacf
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 5 deletions.
19 changes: 15 additions & 4 deletions pkg/storage/stores/shipper/compactor/retention/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package retention

import (
"bytes"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -109,14 +110,24 @@ func parseChunkID(chunkID []byte) (userID []byte, hexFrom, hexThrough []byte, va
hex = chunkID[j+1:]
break
}

if len(userID) == 0 {
return nil, nil, nil, false
}
_, i = readOneHexPart(hex)
if i == 0 {
return nil, nil, nil, false

// v12+ chunk id format `<user>/<fprint>/<start>:<end>:<checksum>`
// older than v12 chunk id format `<user id>/<fingerprint>:<start time>:<end time>:<checksum>`
if idx := bytes.IndexByte(hex, '/'); idx != -1 {
// v12+ chunk id format, let us skip through the fingerprint using '/`
hex = hex[idx+1:]
} else {
// older than v12 chunk id format, let us skip through the fingerprint using ':'
_, i = readOneHexPart(hex)
if i == 0 {
return nil, nil, nil, false
}
hex = hex[i+1:]
}
hex = hex[i+1:]
hexFrom, i = readOneHexPart(hex)
if i == 0 {
return nil, nil, nil, false
Expand Down
61 changes: 60 additions & 1 deletion pkg/storage/stores/shipper/compactor/retention/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package retention

import (
"fmt"
"strconv"
"testing"
"time"

Expand All @@ -26,7 +27,8 @@ func Test_schemaPeriodForTable(t *testing.T) {
{"first table", schemaCfg, "index_" + indexFromTime(dayFromTime(start).Time.Time()), schemaCfg.Configs[0], true},
{"4 hour after first table", schemaCfg, "index_" + indexFromTime(dayFromTime(start).Time.Time().Add(4*time.Hour)), schemaCfg.Configs[0], true},
{"second schema", schemaCfg, "index_" + indexFromTime(dayFromTime(start.Add(28*time.Hour)).Time.Time()), schemaCfg.Configs[1], true},
{"now", schemaCfg, "index_" + indexFromTime(time.Now()), schemaCfg.Configs[2], true},
{"third schema", schemaCfg, "index_" + indexFromTime(dayFromTime(start.Add(75*time.Hour)).Time.Time()), schemaCfg.Configs[2], true},
{"now", schemaCfg, "index_" + indexFromTime(time.Now()), schemaCfg.Configs[3], true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -36,3 +38,60 @@ func Test_schemaPeriodForTable(t *testing.T) {
})
}
}

func TestParseChunkID(t *testing.T) {
type resp struct {
userID string
from, through int64
valid bool
}
for _, tc := range []struct {
name string
chunkID string
expectedResp resp
}{
{
name: "older than v12 chunk format",
chunkID: "fake/57f628c7f6d57aad:162c699f000:162c69a07eb:eb242d99",
expectedResp: resp{
userID: "fake",
from: 1523750400000,
through: 1523750406123,
valid: true,
},
},
{
name: "v12+ chunk format",
chunkID: "fake/57f628c7f6d57aad/162c699f000:162c69a07eb:eb242d99",
expectedResp: resp{
userID: "fake",
from: 1523750400000,
through: 1523750406123,
valid: true,
},
},
{
name: "invalid format",
chunkID: "fake:57f628c7f6d57aad:162c699f000:162c69a07eb:eb242d99",
expectedResp: resp{
valid: false,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
userID, hexFrom, hexThrough, valid := parseChunkID([]byte(tc.chunkID))
if !tc.expectedResp.valid {
require.False(t, valid)
} else {
require.Equal(t, tc.expectedResp.userID, string(userID))
from, err := strconv.ParseInt(unsafeGetString(hexFrom), 16, 64)
require.NoError(t, err)
require.Equal(t, tc.expectedResp.from, from)

through, err := strconv.ParseInt(unsafeGetString(hexThrough), 16, 64)
require.NoError(t, err)
require.Equal(t, tc.expectedResp.through, through)
}
})
}
}
8 changes: 8 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
separator = "\000"
)

var errNoChunksFound = errors.New("no chunks found in table, please check if there are really no chunks and manually drop the table or " +
"see if there is a bug causing us to drop whole index table")

type TableMarker interface {
// MarkForDelete marks chunks to delete for a given table and returns if it's empty or modified.
MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error)
Expand Down Expand Up @@ -134,11 +137,13 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr
empty := true
modified := false
now := model.Now()
chunksFound := false

for chunkIt.Next() {
if chunkIt.Err() != nil {
return false, false, chunkIt.Err()
}
chunksFound = true
c := chunkIt.Entry()
seriesMap.Add(c.SeriesID, c.UserID, c.Labels)

Expand Down Expand Up @@ -190,6 +195,9 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr
empty = false
seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
}
if !chunksFound {
return false, false, errNoChunksFound
}
if empty {
return true, true, nil
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -220,6 +221,23 @@ func Test_EmptyTable(t *testing.T) {
return nil
})
require.NoError(t, err)

// table with no chunks should error
tempDir := t.TempDir()
emptyDB, err := util.SafeOpenBoltdbFile(filepath.Join(tempDir, "empty.db"))
require.NoError(t, err)
err = emptyDB.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucket(local.IndexBucketName)
require.NoError(t, err)

it, err := newChunkIndexIterator(bucket, schema.config)
require.NoError(t, err)
_, _, err = markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(&fakeLimits{}), nil)
require.Equal(t, err, errNoChunksFound)
return nil
})
require.NoError(t, err)
}

func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time, through model.Time) chunk.Chunk {
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ var (
},
RowShards: 16,
},
{
From: dayFromTime(start.Add(100 * time.Hour)),
IndexType: "boltdb",
ObjectType: "filesystem",
Schema: "v12",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 16,
},
},
},
}
Expand All @@ -88,6 +99,7 @@ var (
{"v9", schemaCfg.Configs[0].From.Time, schemaCfg.Configs[0]},
{"v10", schemaCfg.Configs[1].From.Time, schemaCfg.Configs[1]},
{"v11", schemaCfg.Configs[2].From.Time, schemaCfg.Configs[2]},
{"v12", schemaCfg.Configs[3].From.Time, schemaCfg.Configs[3]},
}

sweepMetrics = newSweeperMetrics(prometheus.DefaultRegisterer)
Expand Down

0 comments on commit 9fdcacf

Please sign in to comment.