Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: refactor the backend package #877

Merged
merged 6 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ test: export ARGS=$$($(PACKAGES))
test:
$(PREPARE_MOD)
@make failpoint-enable
$(GOTEST) $(RACEFLAG) -tags br_test,leak $(ARGS) || ( make failpoint-disable && exit 1 )
$(GOTEST) $(RACEFLAG) -tags leak $(ARGS) || ( make failpoint-disable && exit 1 )
@make failpoint-disable

testcover: tools
mkdir -p "$(TEST_DIR)"
$(PREPARE_MOD)
@make failpoint-enable
$(GOTEST) -tags br_test -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" \
$(GOTEST) -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" \
$$($(COVERED_PACKAGES)) || ( make failpoint-disable && exit 1 )
@make failpoint-disable

Expand Down Expand Up @@ -186,7 +186,7 @@ static: prepare tools
@# exhaustivestruct - Protobuf structs have hidden fields, like "XXX_NoUnkeyedLiteral"
@# exhaustive - no need to check exhaustiveness of enum switch statements
@# gosec - too many false positive
CGO_ENABLED=0 tools/bin/golangci-lint run --enable-all --build-tags br_test --deadline 120s \
CGO_ENABLED=0 tools/bin/golangci-lint run --enable-all --deadline 120s \
--disable gochecknoglobals \
--disable goimports \
--disable gofmt \
Expand Down
41 changes: 22 additions & 19 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/importer"
"github.com/pingcap/br/pkg/lightning/backend/local"
"github.com/pingcap/br/pkg/lightning/checkpoints"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/config"
"github.com/pingcap/br/pkg/lightning/restore"
"github.com/pingcap/br/pkg/lightning/tikv"
)

func main() {
Expand Down Expand Up @@ -133,12 +136,12 @@ func run() error {
}

func compactCluster(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.Compact(c, tls, store.Address, restore.FullLevelCompact)
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
return tikv.Compact(c, tls, store.Address, restore.FullLevelCompact)
},
)
}
Expand All @@ -154,23 +157,23 @@ func switchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode s
return errors.Errorf("invalid mode %s, must use %s or %s", mode, config.ImportMode, config.NormalMode)
}

return kv.ForAllStores(
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.SwitchMode(c, tls, store.Address, m)
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, m)
},
)
}

func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
mode, err := kv.FetchMode(c, tls, store.Address)
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
mode, err := tikv.FetchMode(c, tls, store.Address)
if err != nil {
fmt.Fprintf(os.Stderr, "%-30s | Error: %v\n", store.Address, err)
} else {
Expand Down Expand Up @@ -231,7 +234,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}

if cfg.TikvImporter.Backend == "importer" {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := importer.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -259,8 +262,8 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := kv.MakeUUID(table.TableName, engineID)
file := kv.LocalFile{Uuid: eID}
_, eID := backend.MakeUUID(table.TableName, engineID)
file := local.File{Uuid: eID}
err := file.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while cleanup engine:", err)
Expand Down Expand Up @@ -357,7 +360,7 @@ func getLocalStoringTables(ctx context.Context, cfg *config.Config) (err2 error)
return nil
}

func unsafeCloseEngine(ctx context.Context, importer kv.Backend, engine string) (*kv.ClosedEngine, error) {
func unsafeCloseEngine(ctx context.Context, importer backend.Backend, engine string) (*backend.ClosedEngine, error) {
if index := strings.LastIndexByte(engine, ':'); index >= 0 {
tableName := engine[:index]
engineID, err := strconv.Atoi(engine[index+1:])
Expand All @@ -378,7 +381,7 @@ func unsafeCloseEngine(ctx context.Context, importer kv.Backend, engine string)
}

func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := importer.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -392,7 +395,7 @@ func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engi
}

func cleanupEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := importer.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand Down
70 changes: 10 additions & 60 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/lightning/metric"
"github.com/pingcap/br/pkg/lightning/verification"
)

const (
maxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

/*
Expand Down Expand Up @@ -102,7 +101,7 @@ type AbstractBackend interface {
Close()

// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() Rows
MakeEmptyRows() kv.Rows

// RetryImportDelay returns the duration to sleep when retrying an import
RetryImportDelay() time.Duration
Expand All @@ -112,7 +111,7 @@ type AbstractBackend interface {
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)
NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

OpenEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -165,15 +164,6 @@ type AbstractBackend interface {
LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error)
}

func fetchRemoteTableModelsFromTLS(ctx context.Context, tls *common.TLS, schema string) ([]*model.TableInfo, error) {
var tables []*model.TableInfo
err := tls.GetJSON(ctx, "/schema/"+schema, &tables)
if err != nil {
return nil, errors.Annotatef(err, "cannot read schema '%s' from remote", schema)
}
return tables, nil
}

// Backend is the delivery target for Lightning
type Backend struct {
abstract AbstractBackend
Expand Down Expand Up @@ -221,11 +211,11 @@ func (be Backend) Close() {
be.abstract.Close()
}

func (be Backend) MakeEmptyRows() Rows {
func (be Backend) MakeEmptyRows() kv.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error) {
func (be Backend) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(tbl, options)
}

Expand Down Expand Up @@ -353,7 +343,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context) (*LocalEngineWriter
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows Rows) error {
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows)
}

Expand Down Expand Up @@ -398,7 +388,7 @@ func (en engine) unsafeClose(ctx context.Context) (*ClosedEngine, error) {
func (engine *ClosedEngine) Import(ctx context.Context) error {
var err error

for i := 0; i < maxRetryTimes; i++ {
for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid)
if !common.IsRetryableError(err) {
Expand All @@ -409,7 +399,7 @@ func (engine *ClosedEngine) Import(ctx context.Context) error {
time.Sleep(engine.backend.RetryImportDelay())
}

return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, maxRetryTimes)
return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, importMaxRetryTimes)
}

// Cleanup deletes the intermediate data from target.
Expand All @@ -424,53 +414,13 @@ func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

// Encoder encodes a row of SQL values into some opaque type which can be
// consumed by OpenEngine.WriteEncoded.
type Encoder interface {
// Close the encoder.
Close()

// Encode encodes a row of SQL values into a backend-friendly format.
Encode(
logger log.Logger,
row []types.Datum,
rowID int64,
columnPermutation []int,
) (Row, error)
}

// Row represents a single encoded row.
type Row interface {
// ClassifyAndAppend separates the data-like and index-like parts of the
// encoded row, and appends these parts into the existing buffers and
// checksums.
ClassifyAndAppend(
data *Rows,
dataChecksum *verification.KVChecksum,
indices *Rows,
indexChecksum *verification.KVChecksum,
)
}

// Rows represents a collection of encoded rows.
type Rows interface {
// SplitIntoChunks splits the rows into multiple consecutive parts, each
// part having total byte size less than `splitSize`. The meaning of "byte
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
SplitIntoChunks(splitSize int) []Rows

// Clear returns a new collection with empty content. It may share the
// capacity with the current instance. The typical usage is `x = x.Clear()`.
Clear() Rows
}

type EngineWriter interface {
AppendRows(
ctx context.Context,
tableName string,
columnNames []string,
commitTS uint64,
rows Rows,
rows kv.Rows,
) error
Close() error
}
14 changes: 10 additions & 4 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend_test

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
Expand All @@ -11,26 +12,31 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/store/tikv/oracle"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/mock"
)

type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
backend kv.Backend
backend backend.Backend
ts uint64
}

var _ = Suite(&backendSuite{})

func Test(t *testing.T) {
TestingT(t)
}

// FIXME: Cannot use the real SetUpTest/TearDownTest to set up the mock
// otherwise the mock error will be ignored.

func (s *backendSuite) setUpTest(c *C) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = kv.MakeBackend(s.mockBackend)
s.backend = backend.MakeBackend(s.mockBackend)
s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0)
}

Expand Down Expand Up @@ -334,7 +340,7 @@ func (s *backendSuite) TestCheckDiskQuota(c *C) {
uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777")
uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999")

fileSizes := []kv.EngineFileSize{
fileSizes := []backend.EngineFileSize{
{
UUID: uuid1,
DiskSize: 1000,
Expand Down
Loading