Skip to content

Commit

Permalink
lightning: refactor the backend package (pingcap#877) (pingcap#902)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Mar 19, 2021
1 parent a21ecde commit c01fcc7
Show file tree
Hide file tree
Showing 44 changed files with 961 additions and 933 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ test: export ARGS=$$($(PACKAGES))
test:
$(PREPARE_MOD)
@make failpoint-enable
$(GOTEST) $(RACEFLAG) -tags br_test,leak $(ARGS) || ( make failpoint-disable && exit 1 )
$(GOTEST) $(RACEFLAG) -tags leak $(ARGS) || ( make failpoint-disable && exit 1 )
@make failpoint-disable

testcover: tools
mkdir -p "$(TEST_DIR)"
$(PREPARE_MOD)
@make failpoint-enable
$(GOTEST) -tags br_test -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" \
$(GOTEST) -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" \
$$($(COVERED_PACKAGES)) || ( make failpoint-disable && exit 1 )
@make failpoint-disable

Expand Down Expand Up @@ -186,7 +186,7 @@ static: prepare tools
@# exhaustivestruct - Protobuf structs have hidden fields, like "XXX_NoUnkeyedLiteral"
@# exhaustive - no need to check exhaustiveness of enum switch statements
@# gosec - too many false positive
CGO_ENABLED=0 tools/bin/golangci-lint run --enable-all --build-tags br_test --deadline 120s \
CGO_ENABLED=0 tools/bin/golangci-lint run --enable-all --deadline 120s \
--disable gochecknoglobals \
--disable goimports \
--disable gofmt \
Expand Down
41 changes: 22 additions & 19 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/importer"
"github.com/pingcap/br/pkg/lightning/backend/local"
"github.com/pingcap/br/pkg/lightning/checkpoints"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/config"
"github.com/pingcap/br/pkg/lightning/restore"
"github.com/pingcap/br/pkg/lightning/tikv"
)

func main() {
Expand Down Expand Up @@ -133,12 +136,12 @@ func run() error {
}

func compactCluster(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.Compact(c, tls, store.Address, restore.FullLevelCompact)
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
return tikv.Compact(c, tls, store.Address, restore.FullLevelCompact)
},
)
}
Expand All @@ -154,23 +157,23 @@ func switchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode s
return errors.Errorf("invalid mode %s, must use %s or %s", mode, config.ImportMode, config.NormalMode)
}

return kv.ForAllStores(
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.SwitchMode(c, tls, store.Address, m)
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, m)
},
)
}

func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
mode, err := kv.FetchMode(c, tls, store.Address)
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
mode, err := tikv.FetchMode(c, tls, store.Address)
if err != nil {
fmt.Fprintf(os.Stderr, "%-30s | Error: %v\n", store.Address, err)
} else {
Expand Down Expand Up @@ -231,7 +234,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}

if cfg.TikvImporter.Backend == "importer" {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := importer.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -259,8 +262,8 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := kv.MakeUUID(table.TableName, engineID)
file := kv.LocalFile{Uuid: eID}
_, eID := backend.MakeUUID(table.TableName, engineID)
file := local.File{Uuid: eID}
err := file.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while cleanup engine:", err)
Expand Down Expand Up @@ -357,7 +360,7 @@ func getLocalStoringTables(ctx context.Context, cfg *config.Config) (err2 error)
return nil
}

func unsafeCloseEngine(ctx context.Context, importer kv.Backend, engine string) (*kv.ClosedEngine, error) {
func unsafeCloseEngine(ctx context.Context, importer backend.Backend, engine string) (*backend.ClosedEngine, error) {
if index := strings.LastIndexByte(engine, ':'); index >= 0 {
tableName := engine[:index]
engineID, err := strconv.Atoi(engine[index+1:])
Expand All @@ -378,7 +381,7 @@ func unsafeCloseEngine(ctx context.Context, importer kv.Backend, engine string)
}

func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := importer.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -392,7 +395,7 @@ func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engi
}

func cleanupEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
importer, err := kv.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
importer, err := importer.NewImporter(ctx, tls, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
Expand Down
70 changes: 10 additions & 60 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/lightning/metric"
"github.com/pingcap/br/pkg/lightning/verification"
)

const (
maxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

/*
Expand Down Expand Up @@ -102,7 +101,7 @@ type AbstractBackend interface {
Close()

// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() Rows
MakeEmptyRows() kv.Rows

// RetryImportDelay returns the duration to sleep when retrying an import
RetryImportDelay() time.Duration
Expand All @@ -112,7 +111,7 @@ type AbstractBackend interface {
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)
NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

OpenEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -165,15 +164,6 @@ type AbstractBackend interface {
LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error)
}

func fetchRemoteTableModelsFromTLS(ctx context.Context, tls *common.TLS, schema string) ([]*model.TableInfo, error) {
var tables []*model.TableInfo
err := tls.GetJSON(ctx, "/schema/"+schema, &tables)
if err != nil {
return nil, errors.Annotatef(err, "cannot read schema '%s' from remote", schema)
}
return tables, nil
}

// Backend is the delivery target for Lightning
type Backend struct {
abstract AbstractBackend
Expand Down Expand Up @@ -221,11 +211,11 @@ func (be Backend) Close() {
be.abstract.Close()
}

func (be Backend) MakeEmptyRows() Rows {
func (be Backend) MakeEmptyRows() kv.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error) {
func (be Backend) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(tbl, options)
}

Expand Down Expand Up @@ -353,7 +343,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context) (*LocalEngineWriter
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows Rows) error {
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, w.ts, rows)
}

Expand Down Expand Up @@ -398,7 +388,7 @@ func (en engine) unsafeClose(ctx context.Context) (*ClosedEngine, error) {
func (engine *ClosedEngine) Import(ctx context.Context) error {
var err error

for i := 0; i < maxRetryTimes; i++ {
for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid)
if !common.IsRetryableError(err) {
Expand All @@ -409,7 +399,7 @@ func (engine *ClosedEngine) Import(ctx context.Context) error {
time.Sleep(engine.backend.RetryImportDelay())
}

return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, maxRetryTimes)
return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, importMaxRetryTimes)
}

// Cleanup deletes the intermediate data from target.
Expand All @@ -424,53 +414,13 @@ func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

// Encoder encodes a row of SQL values into some opaque type which can be
// consumed by OpenEngine.WriteEncoded.
type Encoder interface {
// Close the encoder.
Close()

// Encode encodes a row of SQL values into a backend-friendly format.
Encode(
logger log.Logger,
row []types.Datum,
rowID int64,
columnPermutation []int,
) (Row, error)
}

// Row represents a single encoded row.
type Row interface {
// ClassifyAndAppend separates the data-like and index-like parts of the
// encoded row, and appends these parts into the existing buffers and
// checksums.
ClassifyAndAppend(
data *Rows,
dataChecksum *verification.KVChecksum,
indices *Rows,
indexChecksum *verification.KVChecksum,
)
}

// Rows represents a collection of encoded rows.
type Rows interface {
// SplitIntoChunks splits the rows into multiple consecutive parts, each
// part having total byte size less than `splitSize`. The meaning of "byte
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
SplitIntoChunks(splitSize int) []Rows

// Clear returns a new collection with empty content. It may share the
// capacity with the current instance. The typical usage is `x = x.Clear()`.
Clear() Rows
}

type EngineWriter interface {
AppendRows(
ctx context.Context,
tableName string,
columnNames []string,
commitTS uint64,
rows Rows,
rows kv.Rows,
) error
Close() error
}
14 changes: 10 additions & 4 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend_test

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
Expand All @@ -11,26 +12,31 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/store/tikv/oracle"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/mock"
)

type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
backend kv.Backend
backend backend.Backend
ts uint64
}

var _ = Suite(&backendSuite{})

func Test(t *testing.T) {
TestingT(t)
}

// FIXME: Cannot use the real SetUpTest/TearDownTest to set up the mock
// otherwise the mock error will be ignored.

func (s *backendSuite) setUpTest(c *C) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = kv.MakeBackend(s.mockBackend)
s.backend = backend.MakeBackend(s.mockBackend)
s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0)
}

Expand Down Expand Up @@ -334,7 +340,7 @@ func (s *backendSuite) TestCheckDiskQuota(c *C) {
uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777")
uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999")

fileSizes := []kv.EngineFileSize{
fileSizes := []backend.EngineFileSize{
{
UUID: uuid1,
DiskSize: 1000,
Expand Down
Loading

0 comments on commit c01fcc7

Please sign in to comment.