Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunktool add index-validate and index-clean commands #104

Merged
merged 21 commits into from
Oct 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions .errcheck-exclude
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
(github.com/mitchellh/colorstring).Println
(github.com/go-kit/kit/log.Logger).Log
8 changes: 4 additions & 4 deletions .github/workflows/validate_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go 1.13
- name: Set up Go 1.14
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.14
- name: Unit Tests
run: make test
build:
name: Build Binaries
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go 1.13
- name: Set up Go 1.14
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.14
- name: Build All
run: make all
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v0.4.1

* [ENHANCEMENT] Upgrade the Go version used in build images and tests to golang 1.14.9 to match upstream Cortex. #104
* [FEATURE] Add `chunktool chunk validate-index` and `chunktool chunk clean-index` commands to the chunktool. These commands are used to scan Cortex index backends for invalid index entries. #104

## v0.4.0

* [ENHANCEMENT] Loadgen: Allow users to selectively disable query or write loadgen by leaving their respective URL configs empty. #95
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ cross:
CGO_ENABLED=0 gox -output="dist/{{.Dir}}-{{.OS}}-{{.Arch}}" -ldflags=${LDFLAGS} -arch="amd64" -os="linux windows darwin" ./cmd/logtool

test:
go test -mod=vendor -p=8 ./...
go test -mod=vendor -p=8 ./pkg/...

clean:
rm -rf cmd/cortextool/cortextool
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ The migrate command helps with migrating chunks across cortex clusters. It also
As of now it only supports `Bigtable` or `GCS` as a source to read chunks from for migration while for writing it supports all the storages that Cortex supports.
More details about it [here](./pkg/chunk/migrate/README.md)

##### Chunk Validate/Clean-Index

The `chunk validate-index` and `chunk clean-index` command allows users to scan their index and chunk backends for invalid entries. The `validate-index` command will find invalid entries and ouput them to a CSV file. The `clean-index` command will take that CSV file as input and delete the invalid entries.

## logtool

A CLI tool to parse Cortex query-frontend logs and formats them for easy analysis.
Expand Down
2 changes: 1 addition & 1 deletion cmd/chunktool/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13.12-stretch as build
FROM golang:1.14.9-stretch as build
ARG GOARCH="amd64"
COPY . /build_dir
WORKDIR /build_dir
Expand Down
2 changes: 1 addition & 1 deletion cmd/chunktool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (

func main() {
kingpin.Version("0.0.1")
app := kingpin.New("cortextool", "A command-line tool to manage cortex chunk backends.")
app := kingpin.New("chunktool", "A command-line tool to manage cortex chunk backends.")
logConfig.Register(app)
commands.RegisterChunkCommands(app)
pushGateway.Register(app)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cortextool/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13.12-stretch as build
FROM golang:1.14.9-stretch as build
ARG GOARCH="amd64"
COPY . /build_dir
WORKDIR /build_dir
Expand Down
2 changes: 1 addition & 1 deletion cmd/logtool/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13.12-stretch as build
FROM golang:1.14.9-stretch as build
ARG GOARCH="amd64"
COPY . /build_dir
WORKDIR /build_dir
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/grafana/cortex-tools

go 1.13
go 1.14

require (
cloud.google.com/go/bigtable v1.2.0
Expand All @@ -9,6 +9,8 @@ require (
github.com/alecthomas/repr v0.0.0-20181024024818-d37bc2a10ba1 // indirect
github.com/cortexproject/cortex v1.3.1-0.20200923132904-22f2efdc1339
github.com/dlclark/regexp2 v1.2.0 // indirect
github.com/go-kit/kit v0.10.0
github.com/gocql/gocql v0.0.0-20200526081602-cd04bd7f22a7
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.1
github.com/google/go-github/v32 v32.1.0
Expand All @@ -22,8 +24,10 @@ require (
github.com/prometheus/prometheus v1.8.2-0.20200819132913-cb830b0a9c78
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
google.golang.org/api v0.29.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
gotest.tools v2.2.0+incompatible
)
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/structtag v1.1.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/fgprof v0.9.1 h1:E6FUJ2Mlv043ipLOCFqo8+cHo9MhQ203E2cdEK/isEs=
github.com/felixge/fgprof v0.9.1/go.mod h1:7/HK6JFtFaARhIljgP2IV8rJLIoHDoOYoUphsnGvqxE=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
Expand Down Expand Up @@ -740,6 +741,7 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
Expand Down Expand Up @@ -1043,10 +1045,12 @@ github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20200627165143-92b8a710ab6c h1:XLPw6rny9Vrrvrzhw8pNLrC2+x/kH0a/3gOx5xWDa6Y=
github.com/shurcooL/vfsgen v0.0.0-20200627165143-92b8a710ab6c/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/siebenmann/go-kstat v0.0.0-20160321171754-d34789b79745/go.mod h1:G81aIFAMS9ECrwBYR9YxhlPjWgrItd+Kje78O6+uqm8=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
Expand Down Expand Up @@ -1565,6 +1569,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
Expand Down
180 changes: 180 additions & 0 deletions pkg/chunk/cassandra/scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package cassandra

import (
"context"
"fmt"
"sync"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cassandra"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
)

// scanBatch represents a batch of rows read from Cassandra.
type scanBatch struct {
hash []byte
rangeValue []byte
value []byte
}

type IndexValidator struct {
schema chunk.SchemaConfig
s *StorageClient
o *ObjectClient
tenantID string

totalIgnoredTime *atomic.Int64
totalInvalid *atomic.Int64
}

func NewIndexValidator(
cfg cassandra.Config,
schema chunk.SchemaConfig,
tenantID string,
) (*IndexValidator, error) {
logrus.Debug("Connecting to Cassandra")
o, err := NewObjectClient(
cfg,
schema,
prometheus.NewRegistry(),
)
if err != nil {
return nil, err
}

s, err := NewStorageClient(
cfg,
schema,
prometheus.NewRegistry(),
)
if err != nil {
return nil, err
}

logrus.Debug("Connected")
return &IndexValidator{
schema: schema,
s: s,
o: o,
tenantID: tenantID,
totalIgnoredTime: atomic.NewInt64(0),
totalInvalid: atomic.NewInt64(0),
}, nil
}

func (i *IndexValidator) Stop() {
i.s.Stop()
}

func (i *IndexValidator) IndexScan(ctx context.Context, table string, from model.Time, to model.Time, out chan string) error {
q := i.s.readSession.Query(fmt.Sprintf("SELECT hash, range, value FROM %s", table))

iter := q.WithContext(ctx).Iter()
defer iter.Close()
scanner := iter.Scanner()

wg := &sync.WaitGroup{}
batchChan := make(chan scanBatch, 1000)

for n := 0; n < 64; n++ {
wg.Add(1)
go func() {
defer wg.Done()
for b := range batchChan {
i.checkEntry(ctx, from, to, out, b)
}
}()
}

rowsReadTotal := 0

logrus.WithFields(logrus.Fields{
"table": table,
"from_ts": from.String(),
"to_ts": to.String(),
}).Infoln("starting scan")

for scanner.Next() {
b := scanBatch{}
if err := scanner.Scan(&b.hash, &b.rangeValue, &b.value); err != nil {
return errors.WithStack(err)
}
batchChan <- b
rowsReadTotal++
if rowsReadTotal%25000 == 0 {
logrus.WithFields(logrus.Fields{
"entries_scanned": rowsReadTotal,
"entries_outside_range_skipped": i.totalIgnoredTime.Load(),
"entries_invalid_found": i.totalInvalid.Load(),
}).Infoln("scan progress")
}
}
close(batchChan)
wg.Wait()
return errors.WithStack(scanner.Err())
}

func (i *IndexValidator) checkEntry(
ctx context.Context,
from model.Time,
to model.Time,
out chan string,
entry scanBatch,
) {
chunkID, _, isSeriesID, err := parseChunkTimeRangeValue(entry.rangeValue, entry.value)
if err != nil {
logrus.WithField("chunk_id", chunkID).WithError(err).Errorln("unable to parse chunk time range value")
return
}

if isSeriesID {
logrus.WithField("series_id", chunkID).Debugln("ignoring series id row")
return
}

c, err := chunk.ParseExternalKey(i.tenantID, chunkID)
if err != nil {
logrus.WithField("chunk_id", chunkID).WithError(err).Errorln("unable to parse external key")
return
}

if from > c.Through || (c.From > to && to > 0) {
i.totalIgnoredTime.Inc()
logrus.WithField("chunk_id", chunkID).Debugln("ignoring chunk outside time range")
return
}

chunkTable, err := i.schema.ChunkTableFor(c.From)
if err != nil {
logrus.WithFields(logrus.Fields{
"chunk_id": chunkID,
"from": c.From.String(),
"through": c.Through.String(),
}).WithError(err).Errorln("unable to determine chunk table")
return
}

var count int
err = i.o.readSession.Query(
fmt.Sprintf("SELECT count(*) FROM %s WHERE hash = ?", chunkTable),
c.ExternalKey(),
).WithContext(ctx).Scan(&count)

if err != nil {
logrus.WithFields(logrus.Fields{
"chunk_id": chunkID,
}).WithError(err).Errorln("unable to read chunk table")
return
}

chunkExists := count > 0
if !chunkExists {
i.totalInvalid.Inc()
logrus.WithField("chunk_id", chunkID).Infoln("chunk not found, adding index entry to output file")
out <- fmt.Sprintf("%s,0x%x\n", string(entry.hash), entry.rangeValue)
}
}
Loading