Skip to content

Commit

Permalink
Merge branch 'release-5.4' into cherry-pick-5477-to-release-5.4
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 23, 2022
2 parents 29a0817 + 7575662 commit 10aecfc
Show file tree
Hide file tree
Showing 89 changed files with 2,744 additions and 488 deletions.
14 changes: 11 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/version"
)

Expand All @@ -66,7 +66,7 @@ type Capture struct {
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
TimeAcquirer pdtime.TimeAcquirer
TimeAcquirer pdutil.TimeAcquirer
sorterSystem *ssystem.System

enableNewScheduler bool
Expand Down Expand Up @@ -143,7 +143,7 @@ func (c *Capture) reset(ctx context.Context) error {
if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer, err = pdtime.NewTimeAcquirer(ctx, c.pdClient)
c.TimeAcquirer, err = pdutil.NewTimeAcquirer(ctx, c.pdClient)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -413,6 +413,14 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
newGlobalVars.OwnerRevision = ownerRev
ownerCtx := cdcContext.NewContext(ctx, newGlobalVars)

// Update meta-region label to ensure that meta region isolated from data regions.
err = pdutil.UpdateMetaLabel(ctx, c.pdClient)
if err != nil {
log.Warn("Fail to verify region label rule",
zap.Error(err),
zap.String("captureID", c.info.ID))
}

log.Info("campaign owner successfully",
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestExecDDL(t *testing.T) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
TimeAcquirer: pdutil.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
31 changes: 23 additions & 8 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
Expand Down Expand Up @@ -57,6 +66,13 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
Expand All @@ -65,7 +81,9 @@ func ParseLogFileName(name string) (uint64, string, error) {
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), LogEXT)+SortLogEXT
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
Expand All @@ -74,15 +92,12 @@ func ParseLogFileName(name string) (uint64, string, error) {
return 0, "", nil
}

var commitTs, d1 uint64
var s1, s2, fileType string
var commitTs uint64
var s1, s2, fileType, uid string
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
formatStr := "%s %s %d %s %d" + LogEXT
if ext == TmpEXT {
formatStr += TmpEXT
}
formatStr := logFormat2ParseFormat(RedoLogFileFormatV1)
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, &s1, &s2, &d1, &fileType, &commitTs)
_, err := fmt.Sscanf(name, formatStr, &s1, &s2, &fileType, &commitTs, &uid)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
Expand Down
77 changes: 69 additions & 8 deletions cdc/redo/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package common
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestParseLogFileName(t *testing.T) {
type arg struct {
name string
}
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
tests := []struct {
name string
args arg
Expand All @@ -36,39 +35,99 @@ func TestParseLogFileName(t *testing.T) {
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
Expand All @@ -90,7 +149,9 @@ func TestParseLogFileName(t *testing.T) {
{
name: "err wrong format ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf("%s_%s_%s_%d%s%s", /* a wrong format */
"cp", "test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantErr: ".*bad log name*.",
},
Expand Down
7 changes: 5 additions & 2 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ func (m *ManagerImpl) updateTableResolvedTs(ctx context.Context) error {
return err
}
minResolvedTs := uint64(math.MaxUint64)
for tableID, rts := range rtsMap {
m.rtsMap[tableID] = rts
for tableID := range m.rtsMap {
if rts, ok := rtsMap[tableID]; ok {
m.rtsMap[tableID] = rts
}
rts := m.rtsMap[tableID]
if rts < minResolvedTs {
minResolvedTs = rts
}
Expand Down
75 changes: 75 additions & 0 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package redo

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -173,6 +174,80 @@ func TestLogManagerInProcessor(t *testing.T) {
require.Nil(t, err)
}

// TestUpdateResolvedTsWithDelayedTable tests redo manager doesn't move resolved
// ts forward if one or more tables resolved ts are not returned from underlying
// writer, this secenario happens when there is no data or resolved ts of this
// table sent to redo log writer yet.
func TestUpdateResolvedTsWithDelayedTable(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
cfg := &config.ConsistentConfig{
Level: string(ConsistentLevelEventual),
Storage: "blackhole://",
}
errCh := make(chan error, 1)
opts := &ManagerOptions{
EnableBgRunner: true,
ErrCh: errCh,
}
logMgr, err := NewManager(ctx, cfg, opts)
require.Nil(t, err)

var (
table53 = int64(53)
table55 = int64(55)
table57 = int64(57)

startTs = uint64(100)
table53Ts = uint64(125)
table55Ts = uint64(120)
table57Ts = uint64(110)
)
tables := []model.TableID{table53, table55, table57}
for _, tableID := range tables {
logMgr.AddTable(tableID, startTs)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
logMgr.bgWriteLog(ctx, errCh)
}()

// table 53 has new data, resolved-ts moves forward to 125
rows := []*model.RowChangedEvent{
{CommitTs: table53Ts, Table: &model.TableName{TableID: table53}},
{CommitTs: table53Ts, Table: &model.TableName{TableID: table53}},
}
err = logMgr.EmitRowChangedEvents(ctx, table53, rows...)
require.Nil(t, err)
require.Eventually(t, func() bool {
tsMap, err := logMgr.writer.GetCurrentResolvedTs(ctx, []int64{table53})
require.Nil(t, err)
ts, ok := tsMap[table53]
return ok && ts == table53Ts
}, time.Second, time.Millisecond*10)

// table 55 has no data, but receives resolved-ts event and moves forward to 120
err = logMgr.FlushLog(ctx, table55, table55Ts)
require.Nil(t, err)

// get min resolved ts should take each table into consideration
err = logMgr.updateTableResolvedTs(ctx)
require.Nil(t, err)
require.Equal(t, startTs, logMgr.GetMinResolvedTs())

// table 57 moves forward, update table resolved ts and check again
logMgr.FlushLog(ctx, table57, table57Ts)
err = logMgr.updateTableResolvedTs(ctx)
require.Nil(t, err)
require.Equal(t, table57Ts, logMgr.GetMinResolvedTs())

cancel()
wg.Wait()
}

// TestLogManagerInOwner tests how redo log manager is used in owner,
// where the redo log manager needs to handle DDL event only.
func TestLogManagerInOwner(t *testing.T) {
Expand Down
Loading

0 comments on commit 10aecfc

Please sign in to comment.