Skip to content

Commit

Permalink
Merge branch 'master' into fix-46778
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Sep 11, 2023
2 parents 15899e9 + d3d30f5 commit 3ddc49c
Show file tree
Hide file tree
Showing 27 changed files with 2,158 additions and 1,743 deletions.
28 changes: 18 additions & 10 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@ coverage:
round: down
range: "65...90"

# status:
# project:
# default:
# threshold: 0.2 #Allow the coverage to drop by threshold%, and posting a success status.
# patch:
# default:
# target: 0% # trial operation

# Ref: https://docs.codecov.com/docs/commit-status#disabling-a-status
status:
project:
default:
threshold: 0.2 #Allow the coverage to drop by threshold%, and posting a success status.
patch:
default:
target: 0% # trial operation
project: off # disable it
patch: off # disable it
changes: no

# Ref: https://docs.codecov.com/docs/github-checks#disabling-github-checks-completely-via-yaml
github_checks: false

parsers:
gcov:
branch_detection:
Expand All @@ -43,7 +51,7 @@ component_management:
- component_id: component_parser
name: parser
paths:
- parser/**
- parser/**
- component_id: component_br
name: br
paths:
Expand All @@ -52,7 +60,7 @@ component_management:

flag_management:
default_rules: # the rules that will be followed for any flag added, generally
carryforward: false
carryforward: true
statuses:
- type: project
target: auto
Expand All @@ -61,11 +69,11 @@ flag_management:

ignore:
- "LICENSES"
- "*_test.go"
- "*_test.go"
- ".git"
- "*.yml"
- "*.md"
- "cmd/.*"
- "cmd/.*"
- "docs/.*"
- "vendor/.*"
- "ddl/failtest/.*"
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 28,
shard_count = 31,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
48 changes: 48 additions & 0 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"path/filepath"
"slices"
"sort"
"strings"

Expand Down Expand Up @@ -199,3 +200,50 @@ func MockExternalEngineWithWriter(
}
return GetAllFileNames(ctx, storage, subDir)
}

// EndpointTp is the type of Endpoint.Key.
type EndpointTp int

const (
// ExclusiveEnd represents "..., Endpoint.Key)".
ExclusiveEnd EndpointTp = iota
// InclusiveStart represents "[Endpoint.Key, ...".
InclusiveStart
// InclusiveEnd represents "..., Endpoint.Key]".
InclusiveEnd
)

// Endpoint represents an endpoint of an interval which can be used by GetMaxOverlapping.
type Endpoint struct {
Key []byte
Tp EndpointTp
Weight uint64 // all EndpointTp use positive weight
}

// GetMaxOverlapping returns the maximum overlapping weight treating given
// `points` as endpoints of intervals. `points` are not required to be sorted,
// and will be sorted in-place in this function.
func GetMaxOverlapping(points []Endpoint) int {
slices.SortFunc(points, func(i, j Endpoint) int {
if cmp := bytes.Compare(i.Key, j.Key); cmp != 0 {
return cmp
}
return int(i.Tp) - int(j.Tp)
})
var maxWeight uint64
var curWeight uint64
for _, p := range points {
switch p.Tp {
case InclusiveStart:
curWeight += p.Weight
case ExclusiveEnd:
curWeight -= p.Weight
case InclusiveEnd:
curWeight -= p.Weight
}
if curWeight > maxWeight {
maxWeight = curWeight
}
}
return int(maxWeight)
}
31 changes: 31 additions & 0 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,34 @@ func TestCleanUpFiles(t *testing.T) {
require.Equal(t, []string(nil), statFiles)
require.Equal(t, []string(nil), dataFiles)
}

func TestGetMaxOverlapping(t *testing.T) {
// [1, 3), [2, 4)
points := []Endpoint{
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1},
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1},
}
require.Equal(t, 2, GetMaxOverlapping(points))
// [1, 3), [2, 4), [3, 5)
points = []Endpoint{
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1},
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1},
{Key: []byte{3}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{5}, Tp: ExclusiveEnd, Weight: 1},
}
require.Equal(t, 2, GetMaxOverlapping(points))
// [1, 3], [2, 4], [3, 5]
points = []Endpoint{
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{3}, Tp: InclusiveEnd, Weight: 1},
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{4}, Tp: InclusiveEnd, Weight: 1},
{Key: []byte{3}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{5}, Tp: InclusiveEnd, Weight: 1},
}
require.Equal(t, 3, GetMaxOverlapping(points))
}
110 changes: 91 additions & 19 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"go.uber.org/zap"
)

var multiFileStatNum = 500

// rangePropertiesCollector collects range properties for each range. The zero
// value of rangePropertiesCollector is not ready to use, should call reset()
// first.
Expand All @@ -59,11 +61,12 @@ func (rc *rangePropertiesCollector) encode() []byte {

// WriterSummary is the summary of a writer.
type WriterSummary struct {
WriterID int
Seq int
Min tidbkv.Key
Max tidbkv.Key
TotalSize uint64
WriterID int
Seq int
Min tidbkv.Key
Max tidbkv.Key
TotalSize uint64
MultipleFilesStats []MultipleFilesStat
}

// OnCloseFunc is the callback function when a writer is closed.
Expand Down Expand Up @@ -155,7 +158,7 @@ func (b *WriterBuilder) Build(
if b.dupeDetectEnabled {
keyAdapter = common.DupDetectKeyAdapter{}
}
return &Writer{
ret := &Writer{
rc: &rangePropertiesCollector{
props: make([]*rangeProperty, 0, 1024),
currProp: &rangeProperty{},
Expand All @@ -173,7 +176,47 @@ func (b *WriterBuilder) Build(
kvStore: nil,
onClose: b.onClose,
closed: false,
multiFileStats: make([]MultipleFilesStat, 1),
fileMinKeys: make([]tidbkv.Key, 0, multiFileStatNum),
fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum),
}
ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum)
return ret
}

// MultipleFilesStat is the statistic information of multiple files (currently
// every 500 files). It is used to estimate the data overlapping, and per-file
// statistic information maybe too big to loaded into memory.
type MultipleFilesStat struct {
MinKey tidbkv.Key
MaxKey tidbkv.Key
Filenames [][2]string // [dataFile, statFile]
MaxOverlappingNum int
}

func (m *MultipleFilesStat) build(startKeys, endKeys []tidbkv.Key) {
if len(startKeys) == 0 {
return
}
m.MinKey = startKeys[0]
m.MaxKey = endKeys[0]
for i := 1; i < len(startKeys); i++ {
if m.MinKey.Cmp(startKeys[i]) > 0 {
m.MinKey = startKeys[i]
}
if m.MaxKey.Cmp(endKeys[i]) < 0 {
m.MaxKey = endKeys[i]
}
}

points := make([]Endpoint, 0, len(startKeys)*2)
for _, k := range startKeys {
points = append(points, Endpoint{Key: k, Tp: InclusiveStart, Weight: 1})
}
for _, k := range endKeys {
points = append(points, Endpoint{Key: k, Tp: InclusiveEnd, Weight: 1})
}
m.MaxOverlappingNum = GetMaxOverlapping(points)
}

// Writer is used to write data into external storage.
Expand All @@ -198,6 +241,11 @@ type Writer struct {
// Statistic information per batch.
batchSize uint64

// Statistic information per 500 batches.
multiFileStats []MultipleFilesStat
fileMinKeys []tidbkv.Key
fileMaxKeys []tidbkv.Key

// Statistic information per writer.
minKey tidbkv.Key
maxKey tidbkv.Key
Expand All @@ -218,7 +266,7 @@ func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) e

w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
if w.batchSize >= w.memSizeLimit {
if err := w.flushKVs(ctx); err != nil {
if err := w.flushKVs(ctx, false); err != nil {
return err
}
}
Expand All @@ -238,10 +286,12 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
}
w.closed = true
defer w.kvBuffer.Destroy()
err := w.flushKVs(ctx)
err := w.flushKVs(ctx, true)
if err != nil {
return status(false), err
}
// remove the trailing empty MultipleFilesStat
w.multiFileStats = w.multiFileStats[:len(w.multiFileStats)-1]

logutil.Logger(ctx).Info("close writer",
zap.Int("writerID", w.writerID),
Expand All @@ -251,11 +301,12 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
w.writeBatch = nil

w.onClose(&WriterSummary{
WriterID: w.writerID,
Seq: w.currentSeq,
Min: w.minKey,
Max: w.maxKey,
TotalSize: w.totalSize,
WriterID: w.writerID,
Seq: w.currentSeq,
Min: w.minKey,
Max: w.maxKey,
TotalSize: w.totalSize,
MultipleFilesStats: w.multiFileStats,
})
return status(true), nil
}
Expand All @@ -277,13 +328,13 @@ func (s status) Flushed() bool {
return bool(s)
}

func (w *Writer) flushKVs(ctx context.Context) (err error) {
func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
if len(w.writeBatch) == 0 {
return nil
}

logger := logutil.Logger(ctx)
dataWriter, statWriter, err := w.createStorageWriter(ctx)
dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -339,6 +390,23 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) {

w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, kvSize)

// maintain 500-batch statistics

l := len(w.multiFileStats)
w.multiFileStats[l-1].Filenames = append(w.multiFileStats[l-1].Filenames,
[2]string{dataFile, statFile},
)
w.fileMinKeys = append(w.fileMinKeys, tidbkv.Key(w.writeBatch[0].Key).Clone())
w.fileMaxKeys = append(w.fileMaxKeys, tidbkv.Key(w.writeBatch[len(w.writeBatch)-1].Key).Clone())
if fromClose || len(w.multiFileStats[l-1].Filenames) == multiFileStatNum {
w.multiFileStats[l-1].build(w.fileMinKeys, w.fileMaxKeys)
w.multiFileStats = append(w.multiFileStats, MultipleFilesStat{
Filenames: make([][2]string, 0, multiFileStatNum),
})
w.fileMinKeys = w.fileMinKeys[:0]
w.fileMaxKeys = w.fileMaxKeys[:0]
}

w.writeBatch = w.writeBatch[:0]
w.rc.reset()
w.kvBuffer.Reset()
Expand All @@ -347,16 +415,20 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) {
return nil
}

func (w *Writer) createStorageWriter(ctx context.Context) (data, stats storage.ExternalFileWriter, err error) {
func (w *Writer) createStorageWriter(ctx context.Context) (
dataFile, statFile string,
data, stats storage.ExternalFileWriter,
err error,
) {
dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq))
dataWriter, err := w.store.Create(ctx, dataPath, nil)
if err != nil {
return nil, nil, err
return "", "", nil, nil, err
}
statPath := filepath.Join(w.filenamePrefix+statSuffix, strconv.Itoa(w.currentSeq))
statsWriter, err := w.store.Create(ctx, statPath, nil)
if err != nil {
return nil, nil, err
return "", "", nil, nil, err
}
return dataWriter, statsWriter, nil
return dataPath, statPath, dataWriter, statsWriter, nil
}
Loading

0 comments on commit 3ddc49c

Please sign in to comment.