Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add precheck about downstream CDC/PiTR #39338

Merged
merged 14 commits into from
Nov 30, 2022
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ bazel_golangcilinter:
-- run $$($(PACKAGE_DIRECTORIES)) --config ./.golangci.yaml

bazel_brietest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/brietest/...

bazel_pessimistictest: failpoint-enable bazel_ci_prepare
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty"))
ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader"))
ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource"))
ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR"))

ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint"))
ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint"))
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//br/pkg/version",
"//br/pkg/version/build",
Expand Down Expand Up @@ -77,6 +78,9 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down Expand Up @@ -124,6 +128,7 @@ go_test(
"//br/pkg/lightning/worker",
"//br/pkg/mock",
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//br/pkg/version/build",
"//ddl",
"//errno",
Expand Down Expand Up @@ -158,6 +163,8 @@ go_test(
"@com_github_tikv_pd_client//:client",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,10 @@ func (rc *Controller) checkSourceSchema(ctx context.Context) error {
}
return rc.doPreCheckOnItem(ctx, CheckSourceSchemaValid)
}

func (rc *Controller) checkCDCPiTR(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB {
return nil
}
return rc.doPreCheckOnItem(ctx, CheckTargetUsingCDCPITR)
}
3 changes: 3 additions & 0 deletions br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
CheckTargetClusterVersion CheckItemID = "CHECK_TARGET_CLUSTER_VERSION"
CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT"
CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR"
CheckTargetUsingCDCPITR CheckItemID = "CHECK_TARGET_USING_CDC_PITR"
)

type CheckResult struct {
Expand Down Expand Up @@ -139,6 +140,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
return NewLocalDiskPlacementCheckItem(b.cfg), nil
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
154 changes: 154 additions & 0 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package restore

import (
"bytes"
"context"
"fmt"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/store/pdtypes"
Expand All @@ -40,9 +43,12 @@ import (
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/set"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -672,6 +678,154 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
return msgs, nil
}

// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem {
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
}
}

// GetCheckItemID implements PrecheckItem interface.
func (ci *CDCPITRCheckItem) GetCheckItemID() CheckItemID {
return CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
}
tlsConfig := cfg2.TLSConfig()

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Context: ctx,
})
}

// Check implements PrecheckItem interface.
func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) {
theResult := &CheckResult{
Item: ci.GetCheckItemID(),
Severity: Critical,
}

if ci.cfg.TikvImporter.Backend != config.BackendLocal {
theResult.Passed = true
theResult.Message = "TiDB Lightning is not using local backend, skip this check"
return theResult, nil
}

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
if err != nil {
return nil, errors.Trace(err)
}
//nolint: errcheck
defer ci.etcdCli.Close()
}

errorMsg := make([]string, 0, 2)

pitrCli := streamhelper.NewMetaDataClient(ci.etcdCli)
tasks, err := pitrCli.GetAllTasks(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if len(tasks) > 0 {
names := make([]string, 0, len(tasks))
for _, task := range tasks {
names = append(names, task.Info.GetName())
}
errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names))
}

// check etcd KV of CDC >= v6.2
cdcPrefix := "/tidb/cdc/"
capturePath := []byte("/__cdc_meta__/capture/")
nameSet := make(map[string][]string, 1)
resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, errors.Trace(err)
}
for _, kv := range resp.Kvs {
// example: /tidb/cdc/<clusterID>/__cdc_meta__/capture/<captureID>
k := kv.Key[len(cdcPrefix):]
clusterID, captureID, found := bytes.Cut(k, capturePath)
if found {
nameSet[string(clusterID)] = append(nameSet[string(clusterID)], string(captureID))
}
}
if len(nameSet) == 0 {
// check etcd KV of CDC <= v6.1
cdcPrefixV61 := "/tidb/cdc/capture/"
resp, err = ci.etcdCli.Get(ctx, cdcPrefixV61, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, errors.Trace(err)
}
for _, kv := range resp.Kvs {
// example: /tidb/cdc/capture/<captureID>
k := kv.Key[len(cdcPrefixV61):]
if len(k) == 0 {
continue
}
nameSet["<nil>"] = append(nameSet["<nil>"], string(k))
}
}

if len(nameSet) > 0 {
var captureMsgBuf strings.Builder
captureMsgBuf.WriteString("found CDC capture(s): ")
isFirst := true
for clusterID, captureIDs := range nameSet {
if !isFirst {
captureMsgBuf.WriteString(", ")
}
isFirst = false
captureMsgBuf.WriteString("clusterID: ")
captureMsgBuf.WriteString(clusterID)
captureMsgBuf.WriteString(" captureID(s): ")
captureMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs))
}
captureMsgBuf.WriteString(",")
errorMsg = append(errorMsg, captureMsgBuf.String())
}

if len(errorMsg) > 0 {
errorMsg = append(errorMsg, ci.Instruction)
theResult.Passed = false
theResult.Message = strings.Join(errorMsg, "\n")
} else {
theResult.Passed = true
theResult.Message = "no CDC or PiTR task found"
}

return theResult, nil
}

type schemaCheckItem struct {
cfg *config.Config
preInfoGetter PreRestoreInfoGetter
Expand Down
87 changes: 87 additions & 0 deletions br/pkg/lightning/restore/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/restore/mock"
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
)

type precheckImplSuite struct {
Expand Down Expand Up @@ -581,3 +585,86 @@ func (s *precheckImplSuite) TestTableEmptyCheckBasic() {
s.T().Logf("check result message: %s", result.Message)
s.Require().False(result.Passed)
}

func (s *precheckImplSuite) TestCDCPITRCheckItem() {
integration.BeforeTestExternal(s.T())
testEtcdCluster := integration.NewClusterV3(s.T(), &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(s.T())

ctx := context.Background()
cfg := &config.Config{
TikvImporter: config.TikvImporter{
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
s.Require().NoError(err)
s.Require().NotNil(result)
s.Require().Equal(ci.GetCheckItemID(), result.Item)
s.Require().Equal(Critical, result.Severity)
s.Require().True(result.Passed)
s.Require().Equal("no CDC or PiTR task found", result.Message)

cli := testEtcdCluster.RandClient()
brCli := streamhelper.NewMetaDataClient(cli)
backend, _ := storage.ParseBackend("noop://", nil)
taskInfo, err := streamhelper.NewTaskInfo("br_name").
FromTS(1).
UntilTS(1000).
WithTableFilter("*.*", "!mysql").
ToStorage(backend).
Check()
s.Require().NoError(err)
err = brCli.PutTask(ctx, *taskInfo)
s.Require().NoError(err)
checkEtcdPut := func(key string) {
_, err := cli.Put(ctx, key, "")
s.Require().NoError(err)
}
// TiCDC >= v6.2
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f")
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version")
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count")
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639")
checkEtcdPut("/tidb/cdc/default/default/changefeed/info/test")
checkEtcdPut("/tidb/cdc/default/default/changefeed/info/test-1")
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test")
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1")
checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1")
checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922")

result, err = ci.Check(ctx)
s.Require().NoError(err)
s.Require().False(result.Passed)
s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+
"found CDC capture(s): clusterID: default captureID(s): [3ecd5c98-0148-4086-adfd-17641995e71f],\n"+
"local backend is not compatible with them. Please switch to tidb backend then try again.",
result.Message)

_, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix())
s.Require().NoError(err)

// TiCDC <= v6.1
checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d")
checkEtcdPut("/tidb/cdc/changefeed/info/test")
checkEtcdPut("/tidb/cdc/job/test")
checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b")
checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test")

result, err = ci.Check(ctx)
s.Require().NoError(err)
s.Require().False(result.Passed)
s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+
"found CDC capture(s): clusterID: <nil> captureID(s): [f14cb04d-5ba1-410e-a59b-ccd796920e9d],\n"+
"local backend is not compatible with them. Please switch to tidb backend then try again.",
result.Message)

checker.cfg.TikvImporter.Backend = config.BackendTiDB
result, err = ci.Check(ctx)
s.Require().NoError(err)
s.Require().True(result.Passed)
s.Require().Equal("TiDB Lightning is not using local backend, skip this check", result.Message)
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs()
}
}
// even if checkpoint exists, we still need to make sure CDC/PiTR task is not running.
if err := rc.checkCDCPiTR(ctx); err != nil {
return common.ErrCheckCDCPiTR.Wrap(err).GenWithStackByArgs()
}
}
}

Expand Down
Loading