From 9bc2310aac804c9b59968c2ed3d9fa7ae756e95d Mon Sep 17 00:00:00 2001 From: Ling Jin Date: Wed, 16 Jun 2021 07:48:38 -0500 Subject: [PATCH 01/12] This is an automated cherry-pick of #1879 Signed-off-by: ti-chi-bot --- cdc/processor.go | 14 +- cdc/puller/sorter/backend_pool.go | 4 + cdc/puller/sorter/backend_pool_test.go | 36 ++++- cdc/puller/sorter/sorter_test.go | 51 +++++-- cdc/server.go | 132 ++++++++++++++++++ cdc/server_test.go | 25 ++++ cmd/client_changefeed.go | 56 ++++---- cmd/client_changefeed_test.go | 20 ++- cmd/server.go | 17 ++- cmd/server_test.go | 46 +++++- errors.toml | 20 +++ go.mod | 5 + pkg/config/config.go | 19 ++- pkg/config/config_test.go | 8 ++ pkg/errors/errors.go | 4 + pkg/util/fileutil.go | 88 ++++++++++++ pkg/util/fileutil_test.go | 41 ++++++ tests/_utils/run_cdc_server | 11 +- tests/unified_sorter_sort_dir_conflict/run.sh | 2 +- 19 files changed, 530 insertions(+), 69 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index f3b4b3f422b..c8e07218dc1 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -85,7 +85,7 @@ type oldProcessor struct { globalResolvedTs uint64 localResolvedTs uint64 checkpointTs uint64 - globalcheckpointTs uint64 + globalCheckpointTs uint64 appliedLocalCheckpointTs uint64 flushCheckpointInterval time.Duration @@ -241,7 +241,7 @@ func newProcessor( } if err == nil { - p.globalcheckpointTs = info.CheckpointTs + p.globalCheckpointTs = info.CheckpointTs } for tableID, replicaInfo := range p.status.Tables { @@ -679,7 +679,7 @@ func (p *oldProcessor) globalStatusWorker(ctx context.Context) error { ) updateStatus := func(changefeedStatus *model.ChangeFeedStatus) { - atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs) + atomic.StoreUint64(&p.globalCheckpointTs, changefeedStatus.CheckpointTs) if lastResolvedTs == changefeedStatus.ResolvedTs && lastCheckPointTs == changefeedStatus.CheckpointTs { return @@ -786,15 +786,15 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo return } - globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) + globalCheckpointTs := atomic.LoadUint64(&p.globalCheckpointTs) - if replicaInfo.StartTs < globalcheckpointTs { - // use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized. + if replicaInfo.StartTs < globalCheckpointTs { + // use Warn instead of Panic in case that p.globalCheckpointTs has not been initialized. // The cdc_state_checker will catch a real inconsistency in integration tests. log.Warn("addTable: startTs < checkpoint", util.ZapFieldChangefeed(ctx), zap.Int64("tableID", tableID), - zap.Uint64("checkpoint", globalcheckpointTs), + zap.Uint64("checkpoint", globalCheckpointTs), zap.Uint64("startTs", replicaInfo.StartTs)) } diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index 61997752119..ba12e90c7c1 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -193,6 +193,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) { zap.Int64("table-id", tableID), zap.String("table-name", tableName)) + if err := util.CheckDataDirSatisfied(); err != nil { + return nil, errors.Trace(err) + } + ret, err := newFileBackEnd(fname, &msgPackGenSerde{}) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go index 8f4cec331bf..d01bfd983cd 100644 --- a/cdc/puller/sorter/backend_pool_test.go +++ b/cdc/puller/sorter/backend_pool_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "os" + "path/filepath" "strconv" "time" @@ -34,10 +35,17 @@ var _ = check.SerialSuites(&backendPoolSuite{}) func (s *backendPoolSuite) TestBasicFunction(c *check.C) { defer testleak.AfterTest(c)() - err := os.MkdirAll("/tmp/sorter", 0o755) + dataDir := "/tmp/cdc_data" + err := os.MkdirAll(dataDir, 0o755) + c.Assert(err, check.IsNil) + + sortDir := filepath.Join(dataDir, config.DefaultSortDir) + err = os.MkdirAll(sortDir, 0o755) c.Assert(err, check.IsNil) conf := config.GetDefaultServerConfig() + conf.DataDir = dataDir + conf.Sorter.SortDir = sortDir conf.Sorter.MaxMemoryPressure = 90 // 90% conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G config.StoreGlobalServerConfig(conf) @@ -48,7 +56,7 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() - backEndPool, err := newBackEndPool("/tmp/sorter", "") + backEndPool, err := newBackEndPool(sortDir, "") c.Assert(err, check.IsNil) c.Assert(backEndPool, check.NotNil) defer backEndPool.terminate() @@ -103,14 +111,21 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) { func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { defer testleak.AfterTest(c)() - dir := c.MkDir() - err := os.Chmod(dir, 0o311) // no permission to `ls` + dataDir := "tmp/cdc_data" + sortDir := filepath.Join(dataDir, config.DefaultSortDir) + err := os.MkdirAll(sortDir, 0o311) c.Assert(err, check.IsNil) +<<<<<<< HEAD conf := config.GetDefaultServerConfig() +======= + conf := config.GetGlobalServerConfig() + conf.DataDir = dataDir + conf.Sorter.SortDir = sortDir +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) conf.Sorter.MaxMemoryPressure = 0 // force using files - backEndPool, err := newBackEndPool(dir, "") + backEndPool, err := newBackEndPool(sortDir, "") c.Assert(err, check.IsNil) c.Assert(backEndPool, check.NotNil) defer backEndPool.terminate() @@ -131,10 +146,17 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) { defer testleak.AfterTest(c)() - err := os.MkdirAll("/tmp/sorter", 0o755) + dataDir := c.MkDir() + err := os.Chmod(dataDir, 0o755) + c.Assert(err, check.IsNil) + + sorterDir := filepath.Join(dataDir, config.DefaultSortDir) + err = os.MkdirAll(sorterDir, 0o755) c.Assert(err, check.IsNil) conf := config.GetDefaultServerConfig() + conf.DataDir = dataDir + conf.Sorter.SortDir = sorterDir conf.Sorter.MaxMemoryPressure = 90 // 90% conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G config.StoreGlobalServerConfig(conf) @@ -142,7 +164,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) { err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)") c.Assert(err, check.IsNil) - backEndPool, err := newBackEndPool("/tmp/sorter", "") + backEndPool, err := newBackEndPool(sorterDir, "") c.Assert(err, check.IsNil) c.Assert(backEndPool, check.NotNil) diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 77d2c5bd9dc..5ed2498cae2 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -17,12 +17,15 @@ import ( "context" "math" "os" + "path/filepath" "sync/atomic" "testing" "time" "go.uber.org/zap/zapcore" + _ "net/http/pprof" + "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -32,7 +35,6 @@ import ( "github.com/pingcap/ticdc/pkg/util/testleak" "go.uber.org/zap" "golang.org/x/sync/errgroup" - _ "net/http/pprof" ) const ( @@ -62,18 +64,21 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { defer UnifiedSorterCleanUp() conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) @@ -87,18 +92,21 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { defer UnifiedSorterCleanUp() conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -127,6 +135,11 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun log.Panic("Could not enable failpoint", zap.Error(err)) } + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil) + }() + ctx, cancel := context.WithCancel(ctx) errg, ctx := errgroup.WithContext(ctx) errg.Go(func() error { @@ -284,16 +297,19 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { defer UnifiedSorterCleanUp() conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 0, // disable memory sort MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) // enable the failpoint to simulate delays @@ -311,7 +327,7 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { }() for i := 0; i < 5; i++ { - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) err = testSorter(ctx, c, sorter, 100000000, true) @@ -328,18 +344,21 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { defer log.SetLevel(zapcore.InfoLevel) conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) @@ -374,6 +393,13 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite") }() +<<<<<<< HEAD +======= + // recreate the sorter + sorter, err = NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") + c.Assert(err, check.IsNil) + +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) finishedCh = make(chan struct{}) go func() { err := testSorter(ctx, c, sorter, 10000, true) @@ -397,18 +423,21 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { defer log.SetLevel(zapcore.InfoLevel) conf := config.GetDefaultServerConfig() + conf.DataDir = "/tmp/cdc_data" + sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir) conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, MaxMemoryPressure: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, + SortDir: sortDir, } config.StoreGlobalServerConfig(conf) - err := os.MkdirAll("/tmp/sorter", 0o755) + err := os.MkdirAll(conf.Sorter.SortDir, 0o755) c.Assert(err, check.IsNil) - sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) diff --git a/cdc/server.go b/cdc/server.go index 310e2bfbf56..bd2c783e91b 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -17,6 +17,8 @@ import ( "context" "fmt" "net/http" + "os" + "path/filepath" "strings" "sync" "time" @@ -42,6 +44,9 @@ import ( const ( ownerRunInterval = time.Millisecond * 500 + defaultDataDir = "/tmp/cdc_data" + // dataDirThreshold is used to warn if the free space of the specified data-dir is lower than it, unit is GB + dataDirThreshold = 500 ) // Server is the capture server @@ -95,6 +100,46 @@ func (s *Server) Run(ctx context.Context) error { return cerror.WrapError(cerror.ErrServerNewPDClient, err) } s.pdClient = pdClient +<<<<<<< HEAD +======= + if config.NewReplicaImpl { + tlsConfig, err := conf.Security.ToTLSConfig() + if err != nil { + return errors.Trace(err) + } + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: s.pdEndpoints, + TLS: tlsConfig, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpcTLSOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + }, + }) + if err != nil { + return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") + } + etcdClient := kv.NewCDCEtcdClient(ctx, etcdCli) + s.etcdClient = &etcdClient + } + + if err := s.initDataDir(ctx); err != nil { + return errors.Trace(err) + } +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) // To not block CDC server startup, we need to warn instead of error // when TiKV is incompatible. @@ -296,3 +341,90 @@ func (s *Server) Close() { s.statusServer = nil } } + +func (s *Server) initDataDir(ctx context.Context) error { + if err := s.setUpDataDir(ctx); err != nil { + return errors.Trace(err) + } + conf := config.GetGlobalServerConfig() + err := os.MkdirAll(conf.DataDir, 0o755) + if err != nil { + return errors.Trace(err) + } + diskInfo, err := util.GetDiskInfo(conf.DataDir) + if err != nil { + return errors.Trace(err) + } + + if diskInfo.Avail < dataDirThreshold { + log.Warn(fmt.Sprintf("%s is set as data-dir (%dGB available), ticdc recommend disk for data-dir "+ + "at least have %dGB available space", conf.DataDir, diskInfo.Avail, dataDirThreshold)) + } + + return nil +} + +func (s *Server) setUpDataDir(ctx context.Context) error { + conf := config.GetGlobalServerConfig() + if conf.DataDir != "" { + conf.Sorter.SortDir = filepath.Join(conf.DataDir, config.DefaultSortDir) + config.StoreGlobalServerConfig(conf) + + return nil + } + + // data-dir will be decide by exist changefeed for backward compatibility + allStatus, err := s.etcdClient.GetAllChangeFeedStatus(ctx) + if err != nil { + return errors.Trace(err) + } + + candidates := make([]string, 0, len(allStatus)) + for id := range allStatus { + info, err := s.etcdClient.GetChangeFeedInfo(ctx, id) + if err != nil { + return errors.Trace(err) + } + if info.SortDir != "" { + candidates = append(candidates, info.SortDir) + } + } + + conf.DataDir = defaultDataDir + best, ok := findBestDataDir(candidates) + if ok { + conf.DataDir = best + } + + conf.Sorter.SortDir = filepath.Join(conf.DataDir, config.DefaultSortDir) + config.StoreGlobalServerConfig(conf) + return nil +} + +// try to find the best data dir by rules +// at the moment, only consider available disk space +func findBestDataDir(candidates []string) (result string, ok bool) { + var low uint64 = 0 + for _, dir := range candidates { + if err := util.IsDirReadWritable(dir); err != nil { + log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err)) + continue + } + info, err := util.GetDiskInfo(dir) + if err != nil { + log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err)) + continue + } + if info.Avail > low { + result = dir + low = info.Avail + ok = true + } + } + + if !ok && len(candidates) != 0 { + log.Warn("try to find directory for data-dir failed, use `/tmp/cdc_data` as data-dir", zap.Strings("candidates", candidates)) + } + + return result, ok +} diff --git a/cdc/server_test.go b/cdc/server_test.go index d6642960c9e..f8aa15dbc8b 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -16,6 +16,7 @@ package cdc import ( "context" "net/url" + "path/filepath" "time" "github.com/pingcap/check" @@ -81,3 +82,27 @@ func (s *serverSuite) TestEtcdHealthChecker(c *check.C) { time.Sleep(time.Second * 4) cancel() } + +func (s *serverSuite) TestInitDataDir(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + ctx, cancel := context.WithCancel(context.Background()) + pdEndpoints := []string{ + "http://" + s.clientURL.Host, + "http://invalid-pd-host:2379", + } + server, err := NewServer(pdEndpoints) + c.Assert(err, check.IsNil) + c.Assert(server, check.NotNil) + + conf := config.GetGlobalServerConfig() + conf.DataDir = c.MkDir() + + err = server.initDataDir(ctx) + c.Assert(err, check.IsNil) + c.Assert(conf.DataDir, check.Not(check.Equals), "") + c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, "/tmp/sorter")) + config.StoreGlobalServerConfig(conf) + cancel() +} diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 7b1e6fe0d8d..05629de36c8 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/fatih/color" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -40,12 +41,6 @@ import ( "go.uber.org/zap" ) -const ( - // Use the empty string as the default to let the server local setting override the changefeed setting. - // TODO remove this when we change the changefeed `sort-dir` to no-op, which it currently is NOT. - defaultSortDir = "" -) - var forceEnableOldValueProtocols = []string{ "canal", "maxwell", @@ -88,7 +83,7 @@ func newAdminChangefeedCommand() []*cobra.Command { cmds := []*cobra.Command{ { Use: "pause", - Short: "Pause a replicaiton task (changefeed)", + Short: "Pause a replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext job := model.AdminJob{ @@ -100,7 +95,7 @@ func newAdminChangefeedCommand() []*cobra.Command { }, { Use: "resume", - Short: "Resume a paused replicaiton task (changefeed)", + Short: "Resume a paused replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext job := model.AdminJob{ @@ -115,7 +110,7 @@ func newAdminChangefeedCommand() []*cobra.Command { }, { Use: "remove", - Short: "Remove a replicaiton task (changefeed)", + Short: "Remove a replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext job := model.AdminJob{ @@ -193,7 +188,7 @@ func newListChangefeedCommand() *cobra.Command { func newQueryChangefeedCommand() *cobra.Command { command := &cobra.Command{ Use: "query", - Short: "Query information and status of a replicaiton task (changefeed)", + Short: "Query information and status of a replication task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { ctx := defaultContext @@ -247,10 +242,10 @@ func newQueryChangefeedCommand() *cobra.Command { return command } -func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate bool, credential *security.Credential, captureInfos []*model.CaptureInfo) (*model.ChangeFeedInfo, error) { +func verifyChangefeedParameters(ctx context.Context, cmd *cobra.Command, isCreate bool, credential *security.Credential, captureInfos []*model.CaptureInfo) (*model.ChangeFeedInfo, error) { if isCreate { if sinkURI == "" { - return nil, errors.New("Creating chengfeed without a sink-uri") + return nil, errors.New("Creating changefeed without a sink-uri") } if startTs == 0 { ts, logical, err := pdCli.GetTS(ctx) @@ -293,7 +288,7 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate } if cyclicReplicaID != 0 || len(cyclicFilterReplicaIDs) != 0 { if !(cyclicReplicaID != 0 && len(cyclicFilterReplicaIDs) != 0) { - return nil, errors.New("invaild cyclic config, please make sure using " + + return nil, errors.New("invalid cyclic config, please make sure using " + "nonzero replica ID and specify filter replica IDs") } filter := make([]uint64, 0, len(cyclicFilterReplicaIDs)) @@ -318,7 +313,7 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate protocol := sinkURIParsed.Query().Get("protocol") for _, fp := range forceEnableOldValueProtocols { if protocol == fp { - log.Warn("Attemping to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) cfg.EnableOldValue = true break } @@ -343,7 +338,8 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate switch sortEngine { case model.SortUnified, model.SortInMemory, model.SortInFile: default: - return nil, errors.Errorf("Creating chengfeed with an invalid sort engine(%s), `%s`,`%s` and `%s` are optional.", sortEngine, model.SortUnified, model.SortInMemory, model.SortInFile) + return nil, errors.Errorf("Creating changefeed with an invalid sort engine(%s), "+ + "`%s`,`%s` and `%s` are optional.", sortEngine, model.SortUnified, model.SortInMemory, model.SortInFile) } info := &model.ChangeFeedInfo{ SinkURI: sinkURI, @@ -359,10 +355,12 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate CreatorVersion: version.ReleaseVersion, } + // user is not allowed to set sort-dir at changefeed level if sortDir != "" { - cmd.Printf("[WARN] --sort-dir is deprecated in changefeed settings. " + - "Please use `cdc server --sort-dir` if possible. " + - "The sort-dir here will be no-op\n") + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in changefeed settings. " + + "Please use `cdc server --data-dir` to start the cdc server if possible, sort-dir will be set automatically. " + + "The --sort-dir here will be no-op\n")) + return nil, errors.New("Creating changefeed with `--sort-dir`, it's invalid") } if info.Engine == model.SortInFile { @@ -438,14 +436,14 @@ func changefeedConfigVariables(command *cobra.Command) { command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file") command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format") command.PersistentFlags().StringVar(&sortEngine, "sort-engine", model.SortUnified, "sort engine used for data sort") - command.PersistentFlags().StringVar(&sortDir, "sort-dir", defaultSortDir, "directory used for data sort") + command.PersistentFlags().StringVar(&sortDir, "sort-dir", "", "directory used for data sort") command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)") - command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed") - command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Expremental) Cyclic replication filter replica ID of changefeed") - command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Expremental) Cyclic replication sync DDL of changefeed") - command.PersistentFlags().BoolVar(&syncPointEnabled, "sync-point", false, "(Expremental) Set and Record syncpoint in replication(default off)") - command.PersistentFlags().DurationVar(&syncPointInterval, "sync-interval", 10*time.Minute, "(Expremental) Set the interval for syncpoint in replication(default 10min)") - command.PersistentFlags().MarkHidden("sort-dir") //nolint:errcheck + command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Experimental) Cyclic replication replica ID of changefeed") + command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Experimental) Cyclic replication filter replica ID of changefeed") + command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Experimental) Cyclic replication sync DDL of changefeed") + command.PersistentFlags().BoolVar(&syncPointEnabled, "sync-point", false, "(Experimental) Set and Record syncpoint in replication(default off)") + command.PersistentFlags().DurationVar(&syncPointInterval, "sync-interval", 10*time.Minute, "(Experimental) Set the interval for syncpoint in replication(default 10min)") + _ = command.PersistentFlags().MarkHidden("sort-dir") //nolint:errcheck } func newCreateChangefeedCommand() *cobra.Command { @@ -464,7 +462,7 @@ func newCreateChangefeedCommand() *cobra.Command { if err != nil { return err } - info, err := verifyChangefeedParamers(ctx, cmd, true /* isCreate */, getCredential(), captureInfos) + info, err := verifyChangefeedParameters(ctx, cmd, true /* isCreate */, getCredential(), captureInfos) if err != nil { return err } @@ -594,7 +592,7 @@ func newUpdateChangefeedCommand() *cobra.Command { return err } if strings.ToLower(strings.TrimSpace(yOrN)) != "y" { - cmd.Printf("No upadte to changefeed.\n") + cmd.Printf("No update to changefeed.\n") return nil } } @@ -662,7 +660,7 @@ func newStatisticsChangefeedCommand() *cobra.Command { ReplicationGap: fmt.Sprintf("%dms", replicationGap), Count: count, } - jsonPrint(cmd, &statistics) + _ = jsonPrint(cmd, &statistics) lastCount = count lastTime = now } @@ -678,7 +676,7 @@ func newStatisticsChangefeedCommand() *cobra.Command { func newCreateChangefeedCyclicCommand() *cobra.Command { command := &cobra.Command{ Use: "cyclic", - Short: "(Expremental) Utility about cyclic replication", + Short: "(Experimental) Utility about cyclic replication", } command.AddCommand( &cobra.Command{ diff --git a/cmd/client_changefeed_test.go b/cmd/client_changefeed_test.go index be2bba2e7af..a7c6a7204e1 100644 --- a/cmd/client_changefeed_test.go +++ b/cmd/client_changefeed_test.go @@ -44,18 +44,30 @@ enable-old-value = false c.Assert(err, check.IsNil) sinkURI = "blackhole:///?protocol=maxwell" - info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */, nil, nil) + info, err := verifyChangefeedParameters(ctx, cmd, false /* isCreate */, nil, nil) c.Assert(err, check.IsNil) c.Assert(info.Config.EnableOldValue, check.IsTrue) - c.Assert(info.SortDir, check.Equals, defaultSortDir) + c.Assert(info.SortDir, check.Equals, "") sinkURI = "" - _, err = verifyChangefeedParamers(ctx, cmd, true /* isCreate */, nil, nil) + _, err = verifyChangefeedParameters(ctx, cmd, true /* isCreate */, nil, nil) c.Assert(err, check.NotNil) sinkURI = "blackhole:///" - info, err = verifyChangefeedParamers(ctx, cmd, false /* isCreate */, nil, []*model.CaptureInfo{{Version: "4.0.0"}}) + info, err = verifyChangefeedParameters(ctx, cmd, false /* isCreate */, nil, []*model.CaptureInfo{{Version: "4.0.0"}}) c.Assert(err, check.IsNil) c.Assert(info.Config.EnableOldValue, check.IsFalse) c.Assert(info.Engine, check.Equals, model.SortInMemory) + + sortDir = "/tidb/data" + pdCli = &mockPDClient{} + disableGCSafePointCheck = true + _, err = verifyChangefeedParameters(ctx, cmd, false, nil, nil) + c.Assert(err, check.ErrorMatches, "*Creating changefeed with `--sort-dir`, it's invalid*") + _, err = verifyChangefeedParameters(ctx, cmd, true, nil, nil) + c.Assert(err, check.NotNil) + + sortDir = "" + _, err = verifyChangefeedParameters(ctx, cmd, false, nil, nil) + c.Assert(err, check.IsNil) } diff --git a/cmd/server.go b/cmd/server.go index 478c10ce06d..f82a05b5fb7 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -18,6 +18,7 @@ import ( "strings" "time" + "github.com/fatih/color" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc" @@ -68,6 +69,7 @@ func initServerCmd(cmd *cobra.Command) { cmd.Flags().Int64Var(&serverConfig.GcTTL, "gc-ttl", defaultServerConfig.GcTTL, "CDC GC safepoint TTL duration, specified in seconds") cmd.Flags().StringVar(&serverConfig.LogFile, "log-file", defaultServerConfig.LogFile, "log file path") cmd.Flags().StringVar(&serverConfig.LogLevel, "log-level", defaultServerConfig.LogLevel, "log level (etc: debug|info|warn|error)") + cmd.Flags().StringVar(&serverConfig.DataDir, "data-dir", defaultServerConfig.DataDir, "the path to the directory used to store TiCDC-generated data") cmd.Flags().DurationVar((*time.Duration)(&serverConfig.OwnerFlushInterval), "owner-flush-interval", time.Duration(defaultServerConfig.OwnerFlushInterval), "owner flushes changefeed status interval") cmd.Flags().DurationVar((*time.Duration)(&serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(defaultServerConfig.ProcessorFlushInterval), "processor flushes task status interval") @@ -83,6 +85,7 @@ func initServerCmd(cmd *cobra.Command) { addSecurityFlags(cmd.Flags(), true /* isServer */) cmd.Flags().StringVar(&serverConfigFilePath, "config", "", "Path of the configuration file") + _ = cmd.Flags().MarkHidden("sort-dir") //nolint:errcheck } func runEServer(cmd *cobra.Command, args []string) error { @@ -146,6 +149,8 @@ func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) conf.LogFile = serverConfig.LogFile case "log-level": conf.LogLevel = serverConfig.LogLevel + case "data-dir": + conf.DataDir = serverConfig.DataDir case "owner-flush-interval": conf.OwnerFlushInterval = serverConfig.OwnerFlushInterval case "processor-flush-interval": @@ -169,7 +174,12 @@ func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) case "cert-allowed-cn": conf.Security.CertAllowedCN = serverConfig.Security.CertAllowedCN case "sort-dir": - conf.Sorter.SortDir = serverConfig.Sorter.SortDir + // user specified sorter dir should not take effect + if serverConfig.Sorter.SortDir != config.DefaultSortDir { + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in server settings. " + + "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) + } + conf.Sorter.SortDir = config.DefaultSortDir case "pd", "config": // do nothing default: @@ -192,5 +202,10 @@ func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) } } + if conf.DataDir == "" { + cmd.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " + + "Please use `cdc server --data-dir` to start the cdc server if possible.\n")) + } + return conf, nil } diff --git a/cmd/server_test.go b/cmd/server_test.go index 40a5e01519b..76b674a1a21 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -14,6 +14,7 @@ package cmd import ( + "fmt" "io/ioutil" "path/filepath" "time" @@ -36,6 +37,30 @@ func (s *serverSuite) TestPatchTiDBConf(c *check.C) { c.Assert(cfg.TiKVClient.MaxBatchSize, check.Equals, uint(0)) } +func (s *serverSuite) TestDataDirServerConfig(c *check.C) { + defer testleak.AfterTest(c)() + cmd := new(cobra.Command) + initServerCmd(cmd) + c.Assert(cmd.ParseFlags([]string{}), check.IsNil) + cfg, err := loadAndVerifyServerConfig(cmd) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + // data dir default to "" + c.Assert(cfg.DataDir, check.Equals, "") + c.Assert(cfg.Sorter.SortDir, check.Equals, filepath.Join("", "/tmp/sorter")) + + dataDir := c.MkDir() + cmd = new(cobra.Command) + initServerCmd(cmd) + c.Assert(cmd.ParseFlags([]string{"--data-dir=" + dataDir}), check.IsNil) + cfg, err = loadAndVerifyServerConfig(cmd) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + c.Assert(cfg.DataDir, check.Equals, dataDir) + // sorter-dir is not set yet + c.Assert(cfg.Sorter.SortDir, check.Equals, "/tmp/sorter") +} + func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { defer testleak.AfterTest(c)() // test default flag values @@ -44,6 +69,8 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { c.Assert(cmd.ParseFlags([]string{}), check.IsNil) cfg, err := loadAndVerifyServerConfig(cmd) c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + defcfg := config.GetDefaultServerConfig() c.Assert(defcfg.ValidateAndAdjust(), check.IsNil) c.Assert(cfg, check.DeepEquals, defcfg) @@ -71,6 +98,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { c.Assert(err, check.IsNil) // test flags without config file + dataDir := c.MkDir() cmd = new(cobra.Command) initServerCmd(cmd) c.Assert(cmd.ParseFlags([]string{ @@ -78,6 +106,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { "--advertise-addr", "127.5.5.1:7777", "--log-file", "/root/cdc.log", "--log-level", "debug", + "--data-dir", dataDir, "--gc-ttl", "10", "--tz", "UTC", "--owner-flush-interval", "150ms", @@ -99,6 +128,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { AdvertiseAddr: "127.5.5.1:7777", LogFile: "/root/cdc.log", LogLevel: "debug", + DataDir: dataDir, GcTTL: 10, TZ: "UTC", CaptureSessionTTL: 10, @@ -110,7 +140,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { MaxMemoryPressure: 70, MaxMemoryConsumption: 60000, NumWorkerPoolGoroutine: 90, - SortDir: "/tmp/just_a_test", + SortDir: config.DefaultSortDir, }, Security: &config.SecurityConfig{ CertPath: "bb", @@ -125,15 +155,16 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { }) // test decode config file + dataDir = c.MkDir() tmpDir := c.MkDir() configPath := filepath.Join(tmpDir, "ticdc.toml") - configContent := ` + configContent := fmt.Sprintf(` addr = "128.0.0.1:1234" advertise-addr = "127.0.0.1:1111" log-file = "/root/cdc1.log" log-level = "warn" - +data-dir = "%+v" gc-ttl = 500 tz = "US" capture-session-ttl = 10 @@ -148,7 +179,7 @@ max-memory-percentage = 3 num-concurrent-worker = 4 num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" -` +`, dataDir) err = ioutil.WriteFile(configPath, []byte(configContent), 0o644) c.Assert(err, check.IsNil) cmd = new(cobra.Command) @@ -161,6 +192,7 @@ sort-dir = "/tmp/just_a_test" AdvertiseAddr: "127.0.0.1:1111", LogFile: "/root/cdc1.log", LogLevel: "warn", + DataDir: dataDir, GcTTL: 500, TZ: "US", CaptureSessionTTL: 10, @@ -172,7 +204,7 @@ sort-dir = "/tmp/just_a_test" MaxMemoryPressure: 3, MaxMemoryConsumption: 2000000, NumWorkerPoolGoroutine: 5, - SortDir: "/tmp/just_a_test", + SortDir: config.DefaultSortDir, }, Security: &config.SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M @@ -197,6 +229,7 @@ cert-allowed-cn = ["dd","ee"] "--addr", "127.5.5.1:8833", "--log-file", "/root/cdc.log", "--log-level", "debug", + "--data-dir", dataDir, "--gc-ttl", "10", "--tz", "UTC", "--owner-flush-interval", "150ms", @@ -215,6 +248,7 @@ cert-allowed-cn = ["dd","ee"] AdvertiseAddr: "127.0.0.1:1111", LogFile: "/root/cdc.log", LogLevel: "debug", + DataDir: dataDir, GcTTL: 10, TZ: "UTC", CaptureSessionTTL: 10, @@ -226,7 +260,7 @@ cert-allowed-cn = ["dd","ee"] MaxMemoryPressure: 70, MaxMemoryConsumption: 60000000, NumWorkerPoolGoroutine: 5, - SortDir: "/tmp/just_a_test", + SortDir: config.DefaultSortDir, }, Security: &config.SecurityConfig{ CertPath: "bb", diff --git a/errors.toml b/errors.toml index bf62e195a12..c30188f59d1 100755 --- a/errors.toml +++ b/errors.toml @@ -121,6 +121,21 @@ error = ''' failed to request PD ''' +["CDC:ErrCheckDataDirSatisfied"] +error = ''' +check data dir satisfied failed +''' + +["CDC:ErrCheckDirReadable"] +error = ''' +check dir readable failed +''' + +["CDC:ErrCheckDirValid"] +error = ''' +check dir valid failed +''' + ["CDC:ErrCheckDirWritable"] error = ''' check dir writable failed @@ -266,6 +281,11 @@ error = ''' get stores from pd failed ''' +["CDC:ErrGetDiskInfo"] +error = ''' +get dir disk info failed +''' + ["CDC:ErrGetRegionFailed"] error = ''' get region failed diff --git a/go.mod b/go.mod index c160223c7e2..a20f51829ae 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,11 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/davecgh/go-spew v1.1.1 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 +<<<<<<< HEAD +======= + github.com/fatih/color v1.10.0 + github.com/frankban/quicktest v1.11.1 // indirect +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) github.com/go-sql-driver/mysql v1.5.0 github.com/golang/protobuf v1.3.4 github.com/golang/snappy v0.0.2 // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index 7149e901b61..37746260e61 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,9 +29,23 @@ import ( "go.uber.org/zap" ) +<<<<<<< HEAD // NewReplicaImpl is true if we using new processor // new owner should be also switched on after it implemented const NewReplicaImpl = false +======= +const ( + // NewReplicaImpl is true if we using new processor + // new owner should be also switched on after it implemented + NewReplicaImpl = true + // DefaultSortDir is the default value of sort-dir, it will be s sub directory of data-dir. + DefaultSortDir = "/tmp/sorter" +) + +func init() { + StoreGlobalServerConfig(GetDefaultServerConfig()) +} +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) var defaultReplicaConfig = &ReplicaConfig{ CaseSensitive: true, @@ -143,6 +157,7 @@ var defaultServerConfig = &ServerConfig{ AdvertiseAddr: "", LogFile: "", LogLevel: "info", + DataDir: "", GcTTL: 24 * 60 * 60, // 24H TZ: "System", // The default election-timeout in PD is 3s and minimum session TTL is 5s, @@ -158,7 +173,7 @@ var defaultServerConfig = &ServerConfig{ MaxMemoryPressure: 80, MaxMemoryConsumption: 8 * 1024 * 1024 * 1024, // 8GB NumWorkerPoolGoroutine: 16, - SortDir: "/tmp/cdc_sort", + SortDir: DefaultSortDir, }, Security: &SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20MB @@ -175,6 +190,7 @@ type ServerConfig struct { LogFile string `toml:"log-file" json:"log-file"` LogLevel string `toml:"log-level" json:"log-level"` + DataDir string `toml:"data-dir" json:"data-dir"` GcTTL int64 `toml:"gc-ttl" json:"gc-ttl"` TZ string `toml:"tz" json:"tz"` @@ -272,6 +288,7 @@ func (c *ServerConfig) ValidateAndAdjust() error { if c.Sorter == nil { c.Sorter = defaultServerConfig.Sorter } + c.Sorter.SortDir = DefaultSortDir if c.Sorter.ChunkSizeLimit < 1*1024*1024 { return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("chunk-size-limit should be at least 1MB") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index a3d5ba2483f..01911e302d1 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,15 +80,23 @@ var _ = check.Suite(&serverConfigSuite{}) func (s *serverConfigSuite) TestMarshal(c *check.C) { defer testleak.AfterTest(c)() + rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` + conf := GetDefaultServerConfig() conf.Addr = "192.155.22.33:8887" conf.Sorter.ChunkSizeLimit = 999 b, err := conf.Marshal() c.Assert(err, check.IsNil) +<<<<<<< HEAD c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`) conf2 := new(ServerConfig) err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`)) +======= + c.Assert(b, check.Equals, rawConfig) + conf2 := new(ServerConfig) + err = conf2.Unmarshal([]byte(rawConfig)) +>>>>>>> 9135351d (CDC Server support data-dir (#1879)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index e9b4fe0209e..9f21e5d06cb 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -112,6 +112,10 @@ var ( ErrCheckClusterVersionFromPD = errors.Normalize("failed to request PD", errors.RFCCodeText("CDC:ErrCheckClusterVersionFromPD")) ErrNewSemVersion = errors.Normalize("create sem version", errors.RFCCodeText("CDC:ErrNewSemVersion")) ErrCheckDirWritable = errors.Normalize("check dir writable failed", errors.RFCCodeText("CDC:ErrCheckDirWritable")) + ErrCheckDirReadable = errors.Normalize("check dir readable failed", errors.RFCCodeText("CDC:ErrCheckDirReadable")) + ErrCheckDirValid = errors.Normalize("check dir valid failed", errors.RFCCodeText("CDC:ErrCheckDirValid")) + ErrGetDiskInfo = errors.Normalize("get dir disk info failed", errors.RFCCodeText("CDC:ErrGetDiskInfo")) + ErrCheckDataDirSatisfied = errors.Normalize("check data dir satisfied failed", errors.RFCCodeText("CDC:ErrCheckDataDirSatisfied")) ErrLoadTimezone = errors.Normalize("load timezone", errors.RFCCodeText("CDC:ErrLoadTimezone")) ErrURLFormatInvalid = errors.Normalize("url format is invalid", errors.RFCCodeText("CDC:ErrURLFormatInvalid")) ErrIntersectNoOverlap = errors.Normalize("span doesn't overlap: %+v vs %+v", errors.RFCCodeText("CDC:ErrIntersectNoOverlap")) diff --git a/pkg/util/fileutil.go b/pkg/util/fileutil.go index 5e408d9258e..4b734e88de3 100644 --- a/pkg/util/fileutil.go +++ b/pkg/util/fileutil.go @@ -14,14 +14,24 @@ package util import ( + "fmt" "io/ioutil" "os" "path/filepath" + "syscall" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" ) +const ( + gb = 1024 * 1024 * 1024 + dataDirAvailLowThreshold = 10 // percentage +) + // IsDirAndWritable checks a given path is directory and writable func IsDirAndWritable(path string) error { st, err := os.Stat(path) @@ -42,3 +52,81 @@ func IsDirWritable(dir string) error { } return cerror.WrapError(cerror.ErrCheckDirWritable, os.Remove(f)) } + +// IsDirReadWritable check if the dir is writable and readable by cdc server +func IsDirReadWritable(dir string) error { + f := filepath.Join(dir, "file.test") + if err := ioutil.WriteFile(f, []byte(""), 0o600); err != nil { + return cerror.WrapError(cerror.ErrCheckDirValid, err) + } + + if _, err := ioutil.ReadFile(f); err != nil { + return cerror.WrapError(cerror.ErrCheckDirValid, err) + } + + return cerror.WrapError(cerror.ErrCheckDirValid, os.Remove(f)) +} + +// DiskInfo present the disk amount information, in gb +type DiskInfo struct { + All uint64 + Used uint64 + Free uint64 + Avail uint64 + AvailPercentage float32 +} + +func (d *DiskInfo) String() string { + return fmt.Sprintf("{All: %+vGB; Used: %+vGB; Free: %+vGB; Available: %+vGB; Available Percentage: %+v%%}", + d.All, d.Used, d.Free, d.Avail, d.AvailPercentage) +} + +// GetDiskInfo return the disk space information of the given directory +// the caller should guarantee that dir exist +func GetDiskInfo(dir string) (*DiskInfo, error) { + f := filepath.Join(dir, "file.test") + if err := ioutil.WriteFile(f, []byte(""), 0o600); err != nil { + return nil, cerror.WrapError(cerror.ErrGetDiskInfo, err) + } + + fs := syscall.Statfs_t{} + if err := syscall.Statfs(dir, &fs); err != nil { + return nil, cerror.WrapError(cerror.ErrGetDiskInfo, err) + } + + info := &DiskInfo{ + All: fs.Blocks * uint64(fs.Bsize) / gb, + Avail: fs.Bavail * uint64(fs.Bsize) / gb, + Free: fs.Bfree * uint64(fs.Bsize) / gb, + } + info.Used = info.All - info.Free + info.AvailPercentage = float32(info.Avail) / float32(info.All) * 100 + + if err := os.Remove(f); err != nil { + if !os.IsNotExist(err) { + return info, cerror.WrapError(cerror.ErrGetDiskInfo, err) + } + } + + return info, nil +} + +// CheckDataDirSatisfied check if the data-dir meet the requirement during server running +// the caller should guarantee that dir exist +func CheckDataDirSatisfied() error { + conf := config.GetGlobalServerConfig() + diskInfo, err := GetDiskInfo(conf.DataDir) + if err != nil { + return cerror.WrapError(cerror.ErrCheckDataDirSatisfied, err) + } + if diskInfo.AvailPercentage < dataDirAvailLowThreshold { + failpoint.Inject("InjectCheckDataDirSatisfied", func() { + log.Info("inject check data dir satisfied error") + failpoint.Return(nil) + }) + return cerror.WrapError(cerror.ErrCheckDataDirSatisfied, errors.Errorf("disk is almost full, TiCDC require that the disk mount data-dir "+ + "have 10%% available space, and the total amount has at least 500GB is preferred. disk info: %+v", diskInfo)) + } + + return nil +} diff --git a/pkg/util/fileutil_test.go b/pkg/util/fileutil_test.go index 09736297bea..e498107d962 100644 --- a/pkg/util/fileutil_test.go +++ b/pkg/util/fileutil_test.go @@ -21,6 +21,8 @@ import ( "runtime" "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -62,3 +64,42 @@ func (s *fileUtilSuite) TestIsDirAndWritable(c *check.C) { err = IsDirAndWritable(dir) c.Assert(err, check.IsNil) } + +func (s *fileUtilSuite) TestIsDirReadWritable(c *check.C) { + defer testleak.AfterTest(c)() + + dir := c.MkDir() + err := IsDirReadWritable(dir) + c.Assert(err, check.IsNil) + + path := filepath.Join(dir, "/foo") + err = IsDirReadWritable(path) + c.Assert(err, check.ErrorMatches, ".*no such file or directory") +} + +func (s *fileUtilSuite) TestGetDiskInfo(c *check.C) { + defer testleak.AfterTest(c)() + + dir := c.MkDir() + info, err := GetDiskInfo(dir) + c.Assert(err, check.IsNil) + c.Assert(info, check.NotNil) + + dir = filepath.Join(dir, "/tmp/sorter") + info, err = GetDiskInfo(dir) + c.Assert(info, check.IsNil) + c.Assert(err, check.ErrorMatches, ".*no such file or directory") +} + +func (s *fileUtilSuite) TestCheckDataDirSatisfied(c *check.C) { + defer testleak.AfterTest(c)() + dir := c.MkDir() + conf := config.GetGlobalServerConfig() + conf.DataDir = dir + config.StoreGlobalServerConfig(conf) + + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil) + err := CheckDataDirSatisfied() + c.Assert(err, check.IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil) +} diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index 78ea4ec988e..2be79cc7a76 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -23,6 +23,7 @@ restart= failpoint=$GO_FAILPOINTS config_path= sort_dir= +data_dir= while [[ ${1} ]]; do case "${1}" in @@ -89,6 +90,10 @@ if [ -z "$sort_dir" ]; then sort_dir=${workdir}/cdc-sort${logsuffix} fi +if [ -z "$data_dir" ]; then + data_dir=${workdir}/cdc_data${logsuffix} +fi + echo "[$(date)] <<<<<< START cdc server in $TEST_NAME case >>>>>>" cd $workdir pid=$(ps -C run_cdc_server -o pid=|tr -d '[:space:]') @@ -101,7 +106,8 @@ if [[ "$restart" == "true" ]]; then --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ --sorter-num-workerpool-goroutine 4 \ - --sort-dir $sort_dir \ + --data-dir "$data_dir" \ + --sort-dir "$sort_dir" \ $config_path \ $tls \ $certcn \ @@ -117,7 +123,8 @@ else --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ --sorter-num-workerpool-goroutine 4 \ - --sort-dir $sort_dir \ + --data-dir "$data_dir" \ + --sort-dir "$sort_dir" \ $config_path \ $tls \ $certcn \ diff --git a/tests/unified_sorter_sort_dir_conflict/run.sh b/tests/unified_sorter_sort_dir_conflict/run.sh index 3e1e29764ee..e26627b2b67 100644 --- a/tests/unified_sorter_sort_dir_conflict/run.sh +++ b/tests/unified_sorter_sort_dir_conflict/run.sh @@ -60,7 +60,7 @@ function prepare() { sleep 20 # starts the first second server instance. It should fail, and bring down the changefeed - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 2 --sort-dir /tmp/cdc_sort_1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 1 --sort-dir /tmp/cdc_sort_1 ensure $MAX_RETRIES check_changefeed_mark_stopped_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*ErrConflictingFileLocks.*" kill $capture_pid From e62fc5f1919a338fa7678d38344c342d1e1a6a67 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Thu, 17 Jun 2021 14:41:05 +0800 Subject: [PATCH 02/12] fix conflicts. --- cdc/puller/sorter/backend_pool_test.go | 4 --- cdc/puller/sorter/sorter_test.go | 3 --- cdc/server.go | 37 -------------------------- go.mod | 3 --- pkg/config/config.go | 6 ----- pkg/config/config_test.go | 6 ----- 6 files changed, 59 deletions(-) diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go index d01bfd983cd..c50331b999a 100644 --- a/cdc/puller/sorter/backend_pool_test.go +++ b/cdc/puller/sorter/backend_pool_test.go @@ -116,13 +116,9 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { err := os.MkdirAll(sortDir, 0o311) c.Assert(err, check.IsNil) -<<<<<<< HEAD - conf := config.GetDefaultServerConfig() -======= conf := config.GetGlobalServerConfig() conf.DataDir = dataDir conf.Sorter.SortDir = sortDir ->>>>>>> 9135351d (CDC Server support data-dir (#1879)) conf.Sorter.MaxMemoryPressure = 0 // force using files backEndPool, err := newBackEndPool(sortDir, "") diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 5ed2498cae2..7cc89ff0abe 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -393,13 +393,10 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite") }() -<<<<<<< HEAD -======= // recreate the sorter sorter, err = NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) ->>>>>>> 9135351d (CDC Server support data-dir (#1879)) finishedCh = make(chan struct{}) go func() { err := testSorter(ctx, c, sorter, 10000, true) diff --git a/cdc/server.go b/cdc/server.go index bd2c783e91b..9c26225c807 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -100,47 +100,10 @@ func (s *Server) Run(ctx context.Context) error { return cerror.WrapError(cerror.ErrServerNewPDClient, err) } s.pdClient = pdClient -<<<<<<< HEAD -======= - if config.NewReplicaImpl { - tlsConfig, err := conf.Security.ToTLSConfig() - if err != nil { - return errors.Trace(err) - } - logConfig := logutil.DefaultZapLoggerConfig - logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: s.pdEndpoints, - TLS: tlsConfig, - Context: ctx, - LogConfig: &logConfig, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{ - grpcTLSOption, - grpc.WithBlock(), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: time.Second, - Multiplier: 1.1, - Jitter: 0.1, - MaxDelay: 3 * time.Second, - }, - MinConnectTimeout: 3 * time.Second, - }), - }, - }) - if err != nil { - return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") - } - etcdClient := kv.NewCDCEtcdClient(ctx, etcdCli) - s.etcdClient = &etcdClient - } if err := s.initDataDir(ctx); err != nil { return errors.Trace(err) } ->>>>>>> 9135351d (CDC Server support data-dir (#1879)) - // To not block CDC server startup, we need to warn instead of error // when TiKV is incompatible. errorTiKVIncompatible := false diff --git a/go.mod b/go.mod index a20f51829ae..9b8fe9942d3 100644 --- a/go.mod +++ b/go.mod @@ -14,11 +14,8 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/davecgh/go-spew v1.1.1 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 -<<<<<<< HEAD -======= github.com/fatih/color v1.10.0 github.com/frankban/quicktest v1.11.1 // indirect ->>>>>>> 9135351d (CDC Server support data-dir (#1879)) github.com/go-sql-driver/mysql v1.5.0 github.com/golang/protobuf v1.3.4 github.com/golang/snappy v0.0.2 // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index 37746260e61..684478fc1d4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,11 +29,6 @@ import ( "go.uber.org/zap" ) -<<<<<<< HEAD -// NewReplicaImpl is true if we using new processor -// new owner should be also switched on after it implemented -const NewReplicaImpl = false -======= const ( // NewReplicaImpl is true if we using new processor // new owner should be also switched on after it implemented @@ -45,7 +40,6 @@ const ( func init() { StoreGlobalServerConfig(GetDefaultServerConfig()) } ->>>>>>> 9135351d (CDC Server support data-dir (#1879)) var defaultReplicaConfig = &ReplicaConfig{ CaseSensitive: true, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 01911e302d1..96bb91646fc 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -88,15 +88,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) { b, err := conf.Marshal() c.Assert(err, check.IsNil) -<<<<<<< HEAD - c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`) - conf2 := new(ServerConfig) - err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`)) -======= c.Assert(b, check.Equals, rawConfig) conf2 := new(ServerConfig) err = conf2.Unmarshal([]byte(rawConfig)) ->>>>>>> 9135351d (CDC Server support data-dir (#1879)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } From 9a68c908a220f3cd0dd7492a937c3cd7a3f3cb71 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Mon, 21 Jun 2021 22:33:19 +0800 Subject: [PATCH 03/12] add etcdClient to server, try to fix lack of field. --- cdc/server.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/cdc/server.go b/cdc/server.go index 9c26225c807..daaedf240be 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -34,8 +34,11 @@ import ( "github.com/pingcap/ticdc/pkg/version" "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/logutil" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "google.golang.org/grpc" @@ -56,6 +59,7 @@ type Server struct { ownerLock sync.RWMutex statusServer *http.Server pdClient pd.Client + etcdClient *kv.CDCEtcdClient pdEndpoints []string } @@ -100,6 +104,39 @@ func (s *Server) Run(ctx context.Context) error { return cerror.WrapError(cerror.ErrServerNewPDClient, err) } s.pdClient = pdClient + if config.NewReplicaImpl { + tlsConfig, err := conf.Security.ToTLSConfig() + if err != nil { + return errors.Trace(err) + } + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: s.pdEndpoints, + TLS: tlsConfig, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpcTLSOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + }, + }) + if err != nil { + return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") + } + etcdClient := kv.NewCDCEtcdClient(ctx, etcdCli) + s.etcdClient = &etcdClient + } if err := s.initDataDir(ctx); err != nil { return errors.Trace(err) From 2a3748eff823b479300d351d2da8b175ac70eec5 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Mon, 21 Jun 2021 22:54:31 +0800 Subject: [PATCH 04/12] fix server config marshal test. --- pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 96bb91646fc..4be6f7b75c4 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,7 +80,7 @@ var _ = check.Suite(&serverConfigSuite{}) func (s *serverConfigSuite) TestMarshal(c *check.C) { defer testleak.AfterTest(c)() - rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` + rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}` conf := GetDefaultServerConfig() conf.Addr = "192.155.22.33:8887" From e122742e1c166e680bb65c0cf32250a16a38a77b Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Mon, 21 Jun 2021 23:22:40 +0800 Subject: [PATCH 05/12] make check, fix go mod. --- go.mod | 1 - go.sum | 27 --------------------------- 2 files changed, 28 deletions(-) diff --git a/go.mod b/go.mod index 9b8fe9942d3..1781199fa35 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 github.com/fatih/color v1.10.0 - github.com/frankban/quicktest v1.11.1 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/golang/protobuf v1.3.4 github.com/golang/snappy v0.0.2 // indirect diff --git a/go.sum b/go.sum index 5db18d9fe15..5a3efe00f72 100644 --- a/go.sum +++ b/go.sum @@ -48,14 +48,11 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3U github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= -github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/sarama v1.27.2 h1:1EyY1dsxNDUQEv0O/4TsjosHI2CgB1uo9H/v56xzTxc= github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II= @@ -146,10 +143,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI= -github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -201,7 +196,6 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= -github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -213,7 +207,6 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk= github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw= -github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/gzip v0.0.1/go.mod h1:fGBJBCdt6qCZuCAOwWuFhBB4OOq9EFqlo5dEaFhhu5w= github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= @@ -236,19 +229,15 @@ github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= -github.com/go-openapi/jsonpointer v0.19.3 h1:gihV7YNZK1iK6Tgwwsxo2rJbD1GTbdm72325Bq8FI3w= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc= -github.com/go-openapi/jsonreference v0.19.3 h1:5cxNfTy0UVC3X8JL5ymxzyoUZmo8iZb+jeTWn7tUa8o= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= github.com/go-openapi/spec v0.19.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= -github.com/go-openapi/spec v0.19.4 h1:ixzUSnHTd6hCemgtAJgluaTSGYpLNpJY4mA2DIkdOAo= github.com/go-openapi/spec v0.19.4/go.mod h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8Lj9mJglo= github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= -github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/overalls v0.0.0-20180201144345-22ec1a223b7c/go.mod h1:UqxAgEOt89sCiXlrc/ycnx00LVvUO/eS8tMUkWX4R7w= @@ -437,7 +426,6 @@ github.com/mackerelio/go-osstat v0.1.0/go.mod h1:1K3NeYLhMHPvzUu+ePYXtoB58wkaRpx github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= -github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e h1:hB2xlXdHp/pmPZq0y3QnmWAArdw9PqbmotexnWx/FU8= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= @@ -466,11 +454,8 @@ github.com/mattn/go-sqlite3 v2.0.2+incompatible h1:qzw9c2GNT8UFrgWNDhCTqRqYUSmu/ github.com/mattn/go-sqlite3 v2.0.2+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mgechev/dots v0.0.0-20190921121421-c36f7dcfbb81 h1:QASJXOGm2RZ5Ardbc86qNFvby9AqkLDibfChMtAg5QM= github.com/mgechev/dots v0.0.0-20190921121421-c36f7dcfbb81/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= -github.com/mgechev/revive v1.0.2 h1:v0NxxQ7fSFz/u1NQydPo6EGdq7va0J1BtsZmae6kzUg= github.com/mgechev/revive v1.0.2/go.mod h1:rb0dQy1LVAxW9SWy5R3LPUjevzUbUS316U5MFySA2lo= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -500,7 +485,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -516,7 +500,6 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= -github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g= @@ -544,7 +527,6 @@ github.com/pingcap/errors v0.11.5-0.20201029093017-5a7df2af2ac7/go.mod h1:G7x87l github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 h1:LllgC9eGfqzkfubMgjKIDyZYaa609nNWAyNZtpy2B3M= github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= -github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30= github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd h1:I8IeI8MNiZVKnwuXhcIIzz6pREcOSbq18Q31KYIzFVM= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd/go.mod h1:IVF+ijPSMZVtx2oIqxAg7ur6EyixtTYfOHwpfmlhqI4= @@ -560,7 +542,6 @@ github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkA github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -624,15 +605,11 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= -github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= -github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y= github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= -github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.20.12+incompatible h1:6VEGkOXP/eP4o2Ilk8cSsX0PhOEfX6leqAnD+urrp9M= @@ -641,7 +618,6 @@ github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0/go.mod h1:919LwcH0M7/W4fcZ0/jy0qGght1GIhqyS/EgWGH2j5Q= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= @@ -685,7 +661,6 @@ github.com/swaggo/gin-swagger v1.2.0/go.mod h1:qlH2+W7zXGZkczuL+r2nEBR2JTT+/lX05 github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba/go.mod h1:O1lAbCgAAX/KZ80LM/OXwtWFI/5TvZlwxSg8Cq08PV0= github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+tv8Y= github.com/swaggo/swag v1.6.3/go.mod h1:wcc83tB4Mb2aNiL/HP4MFeQdpHUrca+Rp/DRNgWAUio= -github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476 h1:UjnSXdNPIG+5FJ6xLQODEdk7gSnJlMldu3sPAxxCO+4= github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476/go.mod h1:xDhTyuFIujYiN3DKWC/H/83xcfHp+UE/IzWWampG7Zc= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= @@ -719,9 +694,7 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/unrolled/render v1.0.1/go.mod h1:gN9T0NhL4Bfbwu8ann7Ry/TGHYfosul+J0obPf6NBdM= -github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ= From 5fa87d04ad93cf05eb3f4bcb5b46f4ec58093cea Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Mon, 21 Jun 2021 23:33:56 +0800 Subject: [PATCH 06/12] fix config test, make ci happy. --- pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 4be6f7b75c4..ea53bc58dc8 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,7 +80,7 @@ var _ = check.Suite(&serverConfigSuite{}) func (s *serverConfigSuite) TestMarshal(c *check.C) { defer testleak.AfterTest(c)() - rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}` + rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}` conf := GetDefaultServerConfig() conf.Addr = "192.155.22.33:8887" From 75a5f1dfbf99fd08956c41ed695d6111feded3e7 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Tue, 22 Jun 2021 22:13:19 +0800 Subject: [PATCH 07/12] fix config conflict in marshal config. --- pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index ea53bc58dc8..96bb91646fc 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,7 +80,7 @@ var _ = check.Suite(&serverConfigSuite{}) func (s *serverConfigSuite) TestMarshal(c *check.C) { defer testleak.AfterTest(c)() - rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}` + rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` conf := GetDefaultServerConfig() conf.Addr = "192.155.22.33:8887" From 01b32a6afe320b12fc1f25db553a29abadd35b6e Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Wed, 23 Jun 2021 00:18:13 +0800 Subject: [PATCH 08/12] try to fix tests. --- tests/_utils/run_cdc_server | 11 ++--------- tests/unified_sorter_sort_dir_conflict/run.sh | 10 +++++----- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index 2be79cc7a76..cee2712c70b 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -22,7 +22,6 @@ log_level=debug restart= failpoint=$GO_FAILPOINTS config_path= -sort_dir= data_dir= while [[ ${1} ]]; do @@ -71,8 +70,8 @@ while [[ ${1} ]]; do config_path="--config ${2}" shift ;; - --sort-dir) - sort_dir=${2} + --data-dir) + data_dir=${2} shift ;; *) @@ -86,10 +85,6 @@ while [[ ${1} ]]; do fi done -if [ -z "$sort_dir" ]; then - sort_dir=${workdir}/cdc-sort${logsuffix} -fi - if [ -z "$data_dir" ]; then data_dir=${workdir}/cdc_data${logsuffix} fi @@ -107,7 +102,6 @@ if [[ "$restart" == "true" ]]; then --log-level $log_level \ --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ - --sort-dir "$sort_dir" \ $config_path \ $tls \ $certcn \ @@ -124,7 +118,6 @@ else --log-level $log_level \ --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ - --sort-dir "$sort_dir" \ $config_path \ $tls \ $certcn \ diff --git a/tests/unified_sorter_sort_dir_conflict/run.sh b/tests/unified_sorter_sort_dir_conflict/run.sh index e26627b2b67..df3b9e3aaba 100644 --- a/tests/unified_sorter_sort_dir_conflict/run.sh +++ b/tests/unified_sorter_sort_dir_conflict/run.sh @@ -9,15 +9,15 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 MAX_RETRIES=20 -function check_changefeed_mark_stopped_regex() { +function check_changefeed_mark_error_regex() { endpoints=$1 changefeedid=$2 error_msg=$3 info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) echo "$info" state=$(echo $info|jq -r '.state') - if [[ ! "$state" == "stopped" ]]; then - echo "changefeed state $state does not equal to stopped" + if [[ ! "$state" == "error" ]]; then + echo "changefeed state $state does not equal to error" exit 1 fi message=$(echo $info|jq -r '.error.message') @@ -40,7 +40,7 @@ function prepare() { start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) # starts the first cdc server instance. It will lock the sort-dir first. - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix 1 --sort-dir /tmp/cdc_sort_1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix 1 capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') TOPIC_NAME="ticdc-simple-test-$RANDOM" @@ -60,7 +60,7 @@ function prepare() { sleep 20 # starts the first second server instance. It should fail, and bring down the changefeed - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 1 --sort-dir /tmp/cdc_sort_1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 1 --data-dir "${WORK_DIR}"/cdc_data1 ensure $MAX_RETRIES check_changefeed_mark_stopped_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*ErrConflictingFileLocks.*" kill $capture_pid From 2b891b09dd681ebd81145139924fbbdcc3700575 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Wed, 23 Jun 2021 00:24:44 +0800 Subject: [PATCH 09/12] fix backend_sorter_test. --- cdc/puller/sorter/backend_pool_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go index aa9b726533f..fb767afdd8d 100644 --- a/cdc/puller/sorter/backend_pool_test.go +++ b/cdc/puller/sorter/backend_pool_test.go @@ -111,9 +111,12 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) { func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { defer testleak.AfterTest(c)() - dataDir := "tmp/cdc_data" + dataDir := c.MkDir() sortDir := filepath.Join(dataDir, config.DefaultSortDir) - err := os.MkdirAll(sortDir, 0o311) + err := os.MkdirAll(sortDir, 0o755) + c.Assert(err, check.IsNil) + + err = os.Chmod(sortDir, 0o311) // no permission to `ls` c.Assert(err, check.IsNil) conf := config.GetGlobalServerConfig() From 668386519d8f1c7f8c0917f649fc0ede16c8f338 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 23 Jun 2021 10:33:06 +0800 Subject: [PATCH 10/12] Update pkg/config/config.go --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 82c9dc38317..585fa97629b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -32,7 +32,7 @@ import ( const ( // NewReplicaImpl is true if we using new processor // new owner should be also switched on after it implemented - NewReplicaImpl = true + NewReplicaImpl = false // DefaultSortDir is the default value of sort-dir, it will be s sub directory of data-dir. DefaultSortDir = "/tmp/sorter" ) From df4b94e895ff95c1d2c3e5166d91f8835cbdec76 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Wed, 23 Jun 2021 10:40:22 +0800 Subject: [PATCH 11/12] revert changes in unified sorter. --- tests/unified_sorter_sort_dir_conflict/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unified_sorter_sort_dir_conflict/run.sh b/tests/unified_sorter_sort_dir_conflict/run.sh index df3b9e3aaba..261d7741649 100644 --- a/tests/unified_sorter_sort_dir_conflict/run.sh +++ b/tests/unified_sorter_sort_dir_conflict/run.sh @@ -9,15 +9,15 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 MAX_RETRIES=20 -function check_changefeed_mark_error_regex() { +function check_changefeed_mark_stopped_regex() { endpoints=$1 changefeedid=$2 error_msg=$3 info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) echo "$info" state=$(echo $info|jq -r '.state') - if [[ ! "$state" == "error" ]]; then - echo "changefeed state $state does not equal to error" + if [[ ! "$state" == "stopped" ]]; then + echo "changefeed state $state does not equal to stopped" exit 1 fi message=$(echo $info|jq -r '.error.message') From 35de946f0194d6d071965d1a5d58046380cbf8c9 Mon Sep 17 00:00:00 2001 From: JinlingChristopher Date: Wed, 23 Jun 2021 10:48:21 +0800 Subject: [PATCH 12/12] use owner etcd client to get changefeed related info. --- cdc/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/server.go b/cdc/server.go index 6b8eb70db0b..bdab7c4fd74 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -392,14 +392,14 @@ func (s *Server) setUpDataDir(ctx context.Context) error { } // data-dir will be decide by exist changefeed for backward compatibility - allStatus, err := s.etcdClient.GetAllChangeFeedStatus(ctx) + allStatus, err := s.owner.etcdClient.GetAllChangeFeedStatus(ctx) if err != nil { return errors.Trace(err) } candidates := make([]string, 0, len(allStatus)) for id := range allStatus { - info, err := s.etcdClient.GetChangeFeedInfo(ctx, id) + info, err := s.owner.etcdClient.GetChangeFeedInfo(ctx, id) if err != nil { return errors.Trace(err) }