From 8b33a36cd1a707665c3c262bcf8f340db6fb3bf0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 11:18:11 +0800 Subject: [PATCH 1/9] br/lightning: add kv writer for external backend --- br/pkg/lightning/backend/external/BUILD.bazel | 16 +- br/pkg/lightning/backend/external/file.go | 4 +- .../lightning/backend/external/file_test.go | 14 +- .../lightning/backend/external/sharedisk.go | 36 -- br/pkg/lightning/backend/external/writer.go | 331 ++++++++++++++++++ .../lightning/backend/external/writer_test.go | 92 +++++ 6 files changed, 446 insertions(+), 47 deletions(-) delete mode 100644 br/pkg/lightning/backend/external/sharedisk.go create mode 100644 br/pkg/lightning/backend/external/writer.go create mode 100644 br/pkg/lightning/backend/external/writer_test.go diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 3bb3e1a6a985c..067ed8e1dd356 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -7,16 +7,24 @@ go_library( "codec.go", "file.go", "kv_reader.go", - "sharedisk.go", "stat_reader.go", + "writer.go", ], importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external", visibility = ["//visibility:public"], deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/common", + "//br/pkg/membuf", "//br/pkg/storage", + "//kv", "//util/logutil", "//util/mathutil", + "//util/size", "@com_github_pingcap_errors//:errors", + "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], ) @@ -28,14 +36,18 @@ go_test( "byte_reader_test.go", "codec_test.go", "file_test.go", + "writer_test.go", ], embed = [":external"], flaky = True, - shard_count = 7, + shard_count = 8, deps = [ + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/common", "//br/pkg/storage", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@org_golang_x_exp//rand", + "@org_golang_x_exp//slices", ], ) diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index a87f0a255aa5f..13f5673e5b652 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -94,8 +94,8 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error { s.rc.currProp.size += uint64(len(key) + len(value)) s.rc.currProp.keys++ - if s.rc.currProp.size >= s.rc.propSizeIdxDistance || - s.rc.currProp.keys >= s.rc.propKeysIdxDistance { + if s.rc.currProp.size >= s.rc.propSizeDist || + s.rc.currProp.keys >= s.rc.propKeysDist { newProp := *s.rc.currProp s.rc.props = append(s.rc.props, &newProp) diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index fa2b5ccbd79c6..82cd2c2e935ae 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -31,8 +31,8 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { writer, err := memStore.Create(ctx, "/test", nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() initRC := *rc @@ -49,7 +49,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { // when not accumulated enough data, no range property will be added. require.Equal(t, &initRC, rc) - // propKeysIdxDistance = 2, so after adding 2 keys, a new range property will be added. + // propKeysDist = 2, so after adding 2 keys, a new range property will be added. k2, v2 := []byte("key2"), []byte("value2") err = kvStore.AddKeyValue(k2, v2) require.NoError(t, err) @@ -76,8 +76,8 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { writer, err = memStore.Create(ctx, "/test2", nil) require.NoError(t, err) rc = &rangePropertiesCollector{ - propSizeIdxDistance: 1, - propKeysIdxDistance: 100, + propSizeDist: 1, + propKeysDist: 100, } rc.reset() kvStore, err = NewKeyValueStore(ctx, writer, rc, 2, 2) @@ -116,8 +116,8 @@ func TestKVReadWrite(t *testing.T) { writer, err := memStore.Create(ctx, "/test", nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) diff --git a/br/pkg/lightning/backend/external/sharedisk.go b/br/pkg/lightning/backend/external/sharedisk.go deleted file mode 100644 index 5da36bb9f520e..0000000000000 --- a/br/pkg/lightning/backend/external/sharedisk.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package external - -// rangePropertiesCollector collects range properties for each range. The zero -// value of rangePropertiesCollector is not ready to use, should call reset() -// first. -type rangePropertiesCollector struct { - props []*rangeProperty - currProp *rangeProperty - propSizeIdxDistance uint64 - propKeysIdxDistance uint64 -} - -func (rc *rangePropertiesCollector) reset() { - rc.props = rc.props[:0] - rc.currProp = &rangeProperty{} -} - -// encode encodes rc.props to a byte slice. -func (rc *rangePropertiesCollector) encode() []byte { - b := make([]byte, 0, 1024) - return encodeMultiProps(b, rc.props) -} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go new file mode 100644 index 0000000000000..f364f62df68e3 --- /dev/null +++ b/br/pkg/lightning/backend/external/writer.go @@ -0,0 +1,331 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "encoding/hex" + "path/filepath" + "strconv" + "time" + + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" + "github.com/pingcap/tidb/br/pkg/storage" + tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/size" + "go.uber.org/zap" + "golang.org/x/exp/slices" +) + +// rangePropertiesCollector collects range properties for each range. The zero +// value of rangePropertiesCollector is not ready to use, should call reset() +// first. +type rangePropertiesCollector struct { + props []*rangeProperty + currProp *rangeProperty + propSizeDist uint64 + propKeysDist uint64 +} + +func (rc *rangePropertiesCollector) reset() { + rc.props = rc.props[:0] + rc.currProp = &rangeProperty{} +} + +// encode encodes rc.props to a byte slice. +func (rc *rangePropertiesCollector) encode() []byte { + b := make([]byte, 0, 1024) + return encodeMultiProps(b, rc.props) +} + +type WriterSummary struct { + WriterID int + Seq int + Min tidbkv.Key + Max tidbkv.Key + TotalSize uint64 +} + +type OnCloseFunc func(summary *WriterSummary) + +func DummyOnCloseFunc(*WriterSummary) {} + +type WriterBuilder struct { + ctx context.Context + store storage.ExternalStorage + + memSizeLimit uint64 + writeBatchSize uint64 + propSizeDist uint64 + propKeysDist uint64 + onClose OnCloseFunc + + bufferPool *membuf.Pool +} + +func NewWriterBuilder() *WriterBuilder { + return &WriterBuilder{ + memSizeLimit: 256 * size.MB, + writeBatchSize: 8 * 1024, + propSizeDist: 1 * size.MB, + propKeysDist: 8 * 1024, + onClose: DummyOnCloseFunc, + } +} + +func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { + b.memSizeLimit = size + return b +} + +func (b *WriterBuilder) SetWriterBatchSize(size uint64) *WriterBuilder { + b.writeBatchSize = size + return b +} + +func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder { + b.propSizeDist = dist + return b +} + +func (b *WriterBuilder) SetPropKeysDistance(dist uint64) *WriterBuilder { + b.propKeysDist = dist + return b +} + +func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder { + b.onClose = onClose + return b +} + +func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder { + b.bufferPool = bufferPool + return b +} + +func (b *WriterBuilder) Build( + ctx context.Context, + store storage.ExternalStorage, + writerID int, + filenamePrefix string, +) *Writer { + bp := b.bufferPool + if bp == nil { + bp = membuf.NewPool() + } + return &Writer{ + ctx: ctx, + rc: &rangePropertiesCollector{ + props: make([]*rangeProperty, 0, 1024), + currProp: &rangeProperty{}, + propSizeDist: b.propSizeDist, + propKeysDist: b.propKeysDist, + }, + memSizeLimit: b.memSizeLimit, + store: store, + kvBuffer: bp.NewBuffer(), + writeBatch: make([]common.KvPair, 0, b.writeBatchSize), + currentSeq: 0, + filenamePrefix: filenamePrefix, + writerID: writerID, + kvStore: nil, + onClose: b.onClose, + closed: false, + } +} + +// Writer is used to write data into external storage. +type Writer struct { + ctx context.Context + store storage.ExternalStorage + writerID int + currentSeq int + filenamePrefix string + + kvStore *KeyValueStore + rc *rangePropertiesCollector + + memSizeLimit uint64 + + kvBuffer *membuf.Buffer + writeBatch []common.KvPair + + onClose OnCloseFunc + closed bool + + // Statistic information per batch. + batchSize uint64 + + // Statistic information per writer. + minKey tidbkv.Key + maxKey tidbkv.Key + totalSize uint64 +} + +// AppendRows appends rows to the external storage. +// Note that this method is NOT thread-safe. +func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { + kvs := kv.Rows2KvPairs(rows) + if len(kvs) == 0 { + return nil + } + for _, pair := range kvs { + w.batchSize += uint64(len(pair.Key) + len(pair.Val)) + buf := w.kvBuffer.AllocBytes(len(pair.Key)) + key := append(buf[:0], pair.Key...) + val := w.kvBuffer.AddBytes(pair.Val) + w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val}) + if w.batchSize >= w.memSizeLimit { + if err := w.flushKVs(ctx); err != nil { + return err + } + } + } + return nil +} + +func (w *Writer) IsSynced() bool { + return false +} + +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + if w.closed { + return status(true), nil + } + w.closed = true + defer w.kvBuffer.Destroy() + err := w.flushKVs(ctx) + if err != nil { + return status(false), err + } + + logutil.BgLogger().Info("close writer", + zap.Int("writerID", w.writerID), + zap.String("minKey", hex.EncodeToString(w.minKey)), + zap.String("maxKey", hex.EncodeToString(w.maxKey))) + + w.writeBatch = nil + + w.onClose(&WriterSummary{ + WriterID: w.writerID, + Seq: w.currentSeq, + Min: w.minKey, + Max: w.maxKey, + TotalSize: w.totalSize, + }) + return status(true), nil +} + +func (w *Writer) recordMinMax(newMin, newMax tidbkv.Key, size uint64) { + if len(w.minKey) == 0 || newMin.Cmp(w.minKey) < 0 { + w.minKey = newMin.Clone() + } + if len(w.maxKey) == 0 || newMax.Cmp(w.maxKey) > 0 { + w.maxKey = newMax.Clone() + } + w.totalSize += size +} + +type status bool + +func (s status) Flushed() bool { + return bool(s) +} + +func (w *Writer) flushKVs(ctx context.Context) error { + if len(w.writeBatch) == 0 { + return nil + } + + dataWriter, statWriter, err := w.createStorageWriter() + if err != nil { + return err + } + + ts := time.Now() + var saveBytes uint64 + + defer func() { + w.currentSeq++ + err := dataWriter.Close(w.ctx) + if err != nil { + logutil.BgLogger().Error("close data writer failed", zap.Error(err)) + } + err = statWriter.Close(w.ctx) + if err != nil { + logutil.BgLogger().Error("close stat writer failed", zap.Error(err)) + } + logutil.BgLogger().Info("flush kv", + zap.Duration("time", time.Since(ts)), + zap.Uint64("bytes", saveBytes), + zap.Any("rate", float64(saveBytes)/1024.0/1024.0/time.Since(ts).Seconds())) + }() + + slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) bool { + return bytes.Compare(i.Key, j.Key) < 0 + }) + + w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc, w.writerID, w.currentSeq) + if err != nil { + return err + } + + var kvSize uint64 + for _, pair := range w.writeBatch { + err = w.kvStore.AddKeyValue(pair.Key, pair.Val) + if err != nil { + return err + } + kvSize += uint64(len(pair.Key)) + uint64(len(pair.Val)) + } + + if w.rc.currProp.keys > 0 { + newProp := *w.rc.currProp + w.rc.props = append(w.rc.props, &newProp) + } + _, err = statWriter.Write(w.ctx, w.rc.encode()) + if err != nil { + return err + } + + w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, kvSize) + + w.writeBatch = w.writeBatch[:0] + w.rc.reset() + w.kvBuffer.Reset() + saveBytes = w.batchSize + w.batchSize = 0 + return nil +} + +func (w *Writer) createStorageWriter() (data, stats storage.ExternalFileWriter, err error) { + dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq)) + dataWriter, err := w.store.Create(w.ctx, dataPath, nil) + if err != nil { + return nil, nil, err + } + statPath := filepath.Join(w.filenamePrefix+"_stat", strconv.Itoa(w.currentSeq)) + statsWriter, err := w.store.Create(w.ctx, statPath, nil) + if err != nil { + return nil, nil, err + } + return dataWriter, statsWriter, nil +} diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go new file mode 100644 index 0000000000000..a82496a112846 --- /dev/null +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -0,0 +1,92 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + "golang.org/x/exp/slices" +) + +func TestWriter(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(1)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + + writer := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(2). + Build(ctx, memStore, 0, "/test") + + kvCnt := rand.Intn(10) + 10 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + randLen := rand.Intn(10) + 1 + kvs[i].Key = make([]byte, randLen) + _, err := rand.Read(kvs[i].Key) + require.NoError(t, err) + randLen = rand.Intn(10) + 1 + kvs[i].Val = make([]byte, randLen) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + } + rows := kv.MakeRowsFromKvPairs(kvs) + err := writer.AppendRows(ctx, nil, rows) + require.NoError(t, err) + + _, err = writer.Close(ctx) + require.NoError(t, err) + + slices.SortFunc(kvs, func(i, j common.KvPair) bool { + return bytes.Compare(i.Key, j.Key) < 0 + }) + + bufSize := rand.Intn(100) + 1 + kvReader, err := newKVReader(ctx, "/test/0", memStore, 0, bufSize) + require.NoError(t, err) + for i := 0; i < kvCnt; i++ { + key, value, err := kvReader.nextKV() + require.NoError(t, err) + require.Equal(t, kvs[i].Key, key) + require.Equal(t, kvs[i].Val, value) + } + _, _, err = kvReader.nextKV() + require.Equal(t, io.EOF, err) + + statReader, err := newStatsReader(ctx, memStore, "/test_stat/0", bufSize) + require.NoError(t, err) + + var keyCnt uint64 = 0 + for { + p, err := statReader.nextProp() + if err == io.EOF { + break + } + require.NoError(t, err) + keyCnt += p.keys + } + require.Equal(t, uint64(kvCnt), keyCnt) +} From 6261a557e914437b33bb43295cc65a24bcde7de8 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 12:38:33 +0800 Subject: [PATCH 2/9] address comment --- br/pkg/lightning/backend/external/BUILD.bazel | 1 - br/pkg/lightning/backend/external/writer.go | 29 ++++++++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 067ed8e1dd356..7a3eb13b54324 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -24,7 +24,6 @@ go_library( "//util/mathutil", "//util/size", "@com_github_pingcap_errors//:errors", - "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index f364f62df68e3..54e76847c5acb 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -22,6 +22,8 @@ import ( "strconv" "time" + "slices" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -32,7 +34,6 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/size" "go.uber.org/zap" - "golang.org/x/exp/slices" ) // rangePropertiesCollector collects range properties for each range. The zero @@ -217,7 +218,7 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { return status(false), err } - logutil.BgLogger().Info("close writer", + logutil.Logger(ctx).Info("close writer", zap.Int("writerID", w.writerID), zap.String("minKey", hex.EncodeToString(w.minKey)), zap.String("maxKey", hex.EncodeToString(w.maxKey))) @@ -250,7 +251,7 @@ func (s status) Flushed() bool { return bool(s) } -func (w *Writer) flushKVs(ctx context.Context) error { +func (w *Writer) flushKVs(ctx context.Context) (err error) { if len(w.writeBatch) == 0 { return nil } @@ -265,22 +266,28 @@ func (w *Writer) flushKVs(ctx context.Context) error { defer func() { w.currentSeq++ - err := dataWriter.Close(w.ctx) + err1, err2 := dataWriter.Close(w.ctx), statWriter.Close(w.ctx) if err != nil { - logutil.BgLogger().Error("close data writer failed", zap.Error(err)) + return } - err = statWriter.Close(w.ctx) - if err != nil { - logutil.BgLogger().Error("close stat writer failed", zap.Error(err)) + if err1 != nil { + logutil.Logger(ctx).Error("close data writer failed", zap.Error(err)) + err = err1 + return + } + if err2 != nil { + logutil.Logger(ctx).Error("close stat writer failed", zap.Error(err)) + err = err2 + return } - logutil.BgLogger().Info("flush kv", + logutil.Logger(ctx).Info("flush kv", zap.Duration("time", time.Since(ts)), zap.Uint64("bytes", saveBytes), zap.Any("rate", float64(saveBytes)/1024.0/1024.0/time.Since(ts).Seconds())) }() - slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) bool { - return bytes.Compare(i.Key, j.Key) < 0 + slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) }) w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc, w.writerID, w.currentSeq) From 4318b600eee3f195f7b5bbd8a3af2dbd4d1efc1f Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 13:24:14 +0800 Subject: [PATCH 3/9] fix linter --- br/pkg/lightning/backend/external/writer.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 54e76847c5acb..4f735e6160172 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -57,6 +57,7 @@ func (rc *rangePropertiesCollector) encode() []byte { return encodeMultiProps(b, rc.props) } +// WriterSummary is the summary of a writer. type WriterSummary struct { WriterID int Seq int @@ -65,10 +66,13 @@ type WriterSummary struct { TotalSize uint64 } +// OnCloseFunc is the callback function when a writer is closed. type OnCloseFunc func(summary *WriterSummary) +// DummyOnCloseFunc is a dummy OnCloseFunc. func DummyOnCloseFunc(*WriterSummary) {} +// WriterBuilder builds a new Writer. type WriterBuilder struct { ctx context.Context store storage.ExternalStorage @@ -82,6 +86,7 @@ type WriterBuilder struct { bufferPool *membuf.Pool } +// NewWriterBuilder creates a WriterBuilder. func NewWriterBuilder() *WriterBuilder { return &WriterBuilder{ memSizeLimit: 256 * size.MB, @@ -92,36 +97,43 @@ func NewWriterBuilder() *WriterBuilder { } } +// SetMemorySizeLimit sets the memory size limit of the writer. func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { b.memSizeLimit = size return b } +// SetWriterBatchSize sets the batch size of the writer. func (b *WriterBuilder) SetWriterBatchSize(size uint64) *WriterBuilder { b.writeBatchSize = size return b } +// SetPropSizeDistance sets the distance of range size for each property. func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder { b.propSizeDist = dist return b } +// SetPropKeysDistance sets the distance of range keys for each property. func (b *WriterBuilder) SetPropKeysDistance(dist uint64) *WriterBuilder { b.propKeysDist = dist return b } +// SetOnCloseFunc sets the callback function when a writer is closed. func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder { b.onClose = onClose return b } +// SetBufferPool sets the buffer pool of the writer. func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder { b.bufferPool = bufferPool return b } +// Build builds a new Writer. func (b *WriterBuilder) Build( ctx context.Context, store storage.ExternalStorage, @@ -203,10 +215,12 @@ func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) e return nil } +// IsSynced implements the backend.EngineWriter interface. func (w *Writer) IsSynced() bool { return false } +// Close closes the writer. func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { if w.closed { return status(true), nil @@ -247,6 +261,7 @@ func (w *Writer) recordMinMax(newMin, newMax tidbkv.Key, size uint64) { type status bool +// Flushed implements the backend.ChunkFlushStatus interface. func (s status) Flushed() bool { return bool(s) } From b4aafb69245361c1ef8b423c919e474920f5ba63 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 14:47:08 +0800 Subject: [PATCH 4/9] address comments --- br/pkg/lightning/backend/external/file.go | 8 +++ br/pkg/lightning/backend/external/writer.go | 65 ++++++++----------- .../lightning/backend/external/writer_test.go | 4 +- 3 files changed, 36 insertions(+), 41 deletions(-) diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index 13f5673e5b652..7ba7c939df4f5 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -108,6 +108,14 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error { return nil } +// Close closes the KeyValueStore and append the last range property. +func (s *KeyValueStore) Close() { + if s.rc.currProp.keys > 0 { + newProp := *s.rc.currProp + s.rc.props = append(s.rc.props, &newProp) + } +} + var statSuffix = filepath.Join("_stat", "0") // GetAllFileNames returns a FilePathHandle that contains all data file paths diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 4f735e6160172..f88fd80e5d406 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -74,14 +74,11 @@ func DummyOnCloseFunc(*WriterSummary) {} // WriterBuilder builds a new Writer. type WriterBuilder struct { - ctx context.Context - store storage.ExternalStorage - - memSizeLimit uint64 - writeBatchSize uint64 - propSizeDist uint64 - propKeysDist uint64 - onClose OnCloseFunc + memSizeLimit uint64 + writeBatchCount uint64 + propSizeDist uint64 + propKeysDist uint64 + onClose OnCloseFunc bufferPool *membuf.Pool } @@ -89,11 +86,11 @@ type WriterBuilder struct { // NewWriterBuilder creates a WriterBuilder. func NewWriterBuilder() *WriterBuilder { return &WriterBuilder{ - memSizeLimit: 256 * size.MB, - writeBatchSize: 8 * 1024, - propSizeDist: 1 * size.MB, - propKeysDist: 8 * 1024, - onClose: DummyOnCloseFunc, + memSizeLimit: 256 * size.MB, + writeBatchCount: 8 * 1024, + propSizeDist: 1 * size.MB, + propKeysDist: 8 * 1024, + onClose: DummyOnCloseFunc, } } @@ -103,9 +100,9 @@ func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { return b } -// SetWriterBatchSize sets the batch size of the writer. -func (b *WriterBuilder) SetWriterBatchSize(size uint64) *WriterBuilder { - b.writeBatchSize = size +// SetWriterBatchCount sets the batch count of the writer. +func (b *WriterBuilder) SetWriterBatchCount(count uint64) *WriterBuilder { + b.writeBatchCount = count return b } @@ -135,7 +132,6 @@ func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder { // Build builds a new Writer. func (b *WriterBuilder) Build( - ctx context.Context, store storage.ExternalStorage, writerID int, filenamePrefix string, @@ -145,7 +141,6 @@ func (b *WriterBuilder) Build( bp = membuf.NewPool() } return &Writer{ - ctx: ctx, rc: &rangePropertiesCollector{ props: make([]*rangeProperty, 0, 1024), currProp: &rangeProperty{}, @@ -155,7 +150,7 @@ func (b *WriterBuilder) Build( memSizeLimit: b.memSizeLimit, store: store, kvBuffer: bp.NewBuffer(), - writeBatch: make([]common.KvPair, 0, b.writeBatchSize), + writeBatch: make([]common.KvPair, 0, b.writeBatchCount), currentSeq: 0, filenamePrefix: filenamePrefix, writerID: writerID, @@ -167,7 +162,6 @@ func (b *WriterBuilder) Build( // Writer is used to write data into external storage. type Writer struct { - ctx context.Context store storage.ExternalStorage writerID int currentSeq int @@ -197,13 +191,9 @@ type Writer struct { // Note that this method is NOT thread-safe. func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { kvs := kv.Rows2KvPairs(rows) - if len(kvs) == 0 { - return nil - } for _, pair := range kvs { w.batchSize += uint64(len(pair.Key) + len(pair.Val)) - buf := w.kvBuffer.AllocBytes(len(pair.Key)) - key := append(buf[:0], pair.Key...) + key := w.kvBuffer.AddBytes(pair.Key) val := w.kvBuffer.AddBytes(pair.Val) w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val}) if w.batchSize >= w.memSizeLimit { @@ -271,17 +261,17 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { return nil } - dataWriter, statWriter, err := w.createStorageWriter() + dataWriter, statWriter, err := w.createStorageWriter(ctx) if err != nil { return err } ts := time.Now() - var saveBytes uint64 + var savedBytes uint64 defer func() { w.currentSeq++ - err1, err2 := dataWriter.Close(w.ctx), statWriter.Close(w.ctx) + err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx) if err != nil { return } @@ -297,8 +287,8 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { } logutil.Logger(ctx).Info("flush kv", zap.Duration("time", time.Since(ts)), - zap.Uint64("bytes", saveBytes), - zap.Any("rate", float64(saveBytes)/1024.0/1024.0/time.Since(ts).Seconds())) + zap.Uint64("bytes", savedBytes), + zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds())) }() slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int { @@ -319,11 +309,8 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { kvSize += uint64(len(pair.Key)) + uint64(len(pair.Val)) } - if w.rc.currProp.keys > 0 { - newProp := *w.rc.currProp - w.rc.props = append(w.rc.props, &newProp) - } - _, err = statWriter.Write(w.ctx, w.rc.encode()) + w.kvStore.Close() + _, err = statWriter.Write(ctx, w.rc.encode()) if err != nil { return err } @@ -333,19 +320,19 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { w.writeBatch = w.writeBatch[:0] w.rc.reset() w.kvBuffer.Reset() - saveBytes = w.batchSize + savedBytes = w.batchSize w.batchSize = 0 return nil } -func (w *Writer) createStorageWriter() (data, stats storage.ExternalFileWriter, err error) { +func (w *Writer) createStorageWriter(ctx context.Context) (data, stats storage.ExternalFileWriter, err error) { dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq)) - dataWriter, err := w.store.Create(w.ctx, dataPath, nil) + dataWriter, err := w.store.Create(ctx, dataPath, nil) if err != nil { return nil, nil, err } statPath := filepath.Join(w.filenamePrefix+"_stat", strconv.Itoa(w.currentSeq)) - statsWriter, err := w.store.Create(w.ctx, statPath, nil) + statsWriter, err := w.store.Create(ctx, statPath, nil) if err != nil { return nil, nil, err } diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index a82496a112846..62fddfe78dc2c 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -31,7 +31,7 @@ import ( func TestWriter(t *testing.T) { seed := time.Now().Unix() - rand.Seed(uint64(1)) + rand.Seed(uint64(seed)) t.Logf("seed: %d", seed) ctx := context.Background() memStore := storage.NewMemStorage() @@ -39,7 +39,7 @@ func TestWriter(t *testing.T) { writer := NewWriterBuilder(). SetPropSizeDistance(100). SetPropKeysDistance(2). - Build(ctx, memStore, 0, "/test") + Build(memStore, 0, "/test") kvCnt := rand.Intn(10) + 10 kvs := make([]common.KvPair, kvCnt) From 7ee55426288497811ebe569496e25622cb03abf0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 15:53:57 +0800 Subject: [PATCH 5/9] add coverage for kvstore.close() --- br/pkg/lightning/backend/external/file_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index 82cd2c2e935ae..3bfd1532dde1a 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -72,6 +72,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { err = writer.Close(ctx) require.NoError(t, err) + kvStore.Close() writer, err = memStore.Create(ctx, "/test2", nil) require.NoError(t, err) @@ -103,6 +104,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { keys: 1, } require.Equal(t, expected, rc.props[1]) + kvStore.Close() err = writer.Close(ctx) require.NoError(t, err) } From 86e0624c5447f69818ae592646d0b6ea4b2ae244 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 16:53:57 +0800 Subject: [PATCH 6/9] add test for kvstore.close() --- br/pkg/lightning/backend/external/file_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index 3bfd1532dde1a..352a94682de17 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -73,6 +73,14 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { err = writer.Close(ctx) require.NoError(t, err) kvStore.Close() + expected = &rangeProperty{ + key: k3, + offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16), + size: uint64(len(k3) + len(v3)), + keys: 1, + } + require.Len(t, rc.props, 2) + require.Equal(t, expected, rc.props[1]) writer, err = memStore.Create(ctx, "/test2", nil) require.NoError(t, err) @@ -105,6 +113,8 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { } require.Equal(t, expected, rc.props[1]) kvStore.Close() + // Length of properties should not change after close. + require.Len(t, rc.props, 2) err = writer.Close(ctx) require.NoError(t, err) } From 73c158c97ade6bd0ca11c6e8c354ef90a50066e7 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 16:05:57 +0800 Subject: [PATCH 7/9] test: add test for file names --- .../lightning/backend/external/writer_test.go | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index 62fddfe78dc2c..b540b82898516 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -17,7 +17,9 @@ package external import ( "bytes" "context" + "fmt" "io" + "strings" "testing" "time" @@ -90,3 +92,51 @@ func TestWriter(t *testing.T) { } require.Equal(t, uint64(kvCnt), keyCnt) } + +func TestWriterFlushMultiFileNames(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + + writer := NewWriterBuilder(). + SetPropKeysDistance(2). + SetMemorySizeLimit(60). + Build(memStore, 0, "/test") + + // 200 bytes key values. + kvCnt := 10 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + kvs[i].Key = make([]byte, 10) + _, err := rand.Read(kvs[i].Key) + require.NoError(t, err) + kvs[i].Val = make([]byte, 10) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + } + rows := kv.MakeRowsFromKvPairs(kvs) + err := writer.AppendRows(ctx, nil, rows) + require.NoError(t, err) + + _, err = writer.Close(ctx) + require.NoError(t, err) + + var dataFiles, statFiles []string + err = memStore.WalkDir(ctx, &storage.WalkOption{SubDir: "/test"}, func(path string, size int64) error { + if strings.Contains(path, "_stat") { + statFiles = append(statFiles, path) + } else { + dataFiles = append(dataFiles, path) + } + return nil + }) + require.NoError(t, err) + require.Len(t, dataFiles, 4) + require.Len(t, statFiles, 4) + for i := 0; i < 4; i++ { + require.Equal(t, dataFiles[i], fmt.Sprintf("/test/%d", i)) + require.Equal(t, statFiles[i], fmt.Sprintf("/test_stat/%d", i)) + } +} From 80aeabf8cedc1608e830eb3a3e8d0fa9cd909079 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 16:13:55 +0800 Subject: [PATCH 8/9] return error if writer is closed --- br/pkg/lightning/backend/external/writer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index f88fd80e5d406..79c6349953ba1 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -24,6 +24,7 @@ import ( "slices" + "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -213,7 +214,7 @@ func (w *Writer) IsSynced() bool { // Close closes the writer. func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { if w.closed { - return status(true), nil + return status(false), errors.Errorf("writer %d has been closed", w.writerID) } w.closed = true defer w.kvBuffer.Destroy() From 365bf3d966fb873799d535f18fceb6dae271d01e Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 17:03:57 +0800 Subject: [PATCH 9/9] fix build --- br/pkg/lightning/backend/external/iter_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/br/pkg/lightning/backend/external/iter_test.go b/br/pkg/lightning/backend/external/iter_test.go index 1698c76831307..83d40f1206e7b 100644 --- a/br/pkg/lightning/backend/external/iter_test.go +++ b/br/pkg/lightning/backend/external/iter_test.go @@ -66,8 +66,8 @@ func TestMergeKVIter(t *testing.T) { writer, err := memStore.Create(ctx, filename, nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) @@ -118,8 +118,8 @@ func TestOneUpstream(t *testing.T) { writer, err := memStore.Create(ctx, filename, nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) @@ -196,8 +196,8 @@ func TestCorruptContent(t *testing.T) { writer, err := memStore.Create(ctx, filename, nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) @@ -249,8 +249,8 @@ func generateMockFileReader() *kvReader { panic(err) } rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)