Skip to content

Commit

Permalink
Merge branch 'master' into lightning-speed
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 28, 2022
2 parents 1e71f13 + c240653 commit 67524d6
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 18 deletions.
4 changes: 3 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import (
"golang.org/x/time/rate"
)

const cleanMetaDuration = 10 * time.Second

// Capture represents a Capture server, it monitors the changefeed
// information in etcd and schedules Task on it.
type Capture interface {
Expand Down Expand Up @@ -282,7 +284,7 @@ func (c *captureImpl) run(stdCtx context.Context) error {
return errors.Trace(err)
}
defer func() {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
timeoutCtx, cancel := context.WithTimeout(context.Background(), cleanMetaDuration)
if err := c.EtcdClient.DeleteCaptureInfo(timeoutCtx, c.info.ID); err != nil {
log.Warn("failed to delete capture info when capture exited",
zap.String("captureID", c.info.ID),
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI
})
ok = false
}

// clean stale capture task positions
for captureID := range c.state.TaskPositions {
if _, exist := captures[captureID]; !exist {
c.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
Expand Down
6 changes: 2 additions & 4 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,9 @@ func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorSta
Info: changefeedState.Info,
})
if err := p.Tick(ctx); err != nil {
// processor have already patched its error to tell the owner
// manager can just close the processor and continue to tick other processors
m.closeProcessor(changefeedID)
if cerrors.ErrReactorFinished.Equal(errors.Cause(err)) {
continue
}
return state, errors.Trace(err)
}
}
// check if the processors in memory is leaked
Expand Down
11 changes: 6 additions & 5 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ func isProcessorIgnorableError(err error) bool {

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
// The main logic of processor is in this function, including the calculation of many kinds of ts,
// maintain table pipeline, error handling, etc.
func (p *processor) Tick(ctx cdcContext.Context) error {
// check upstream error first
if err := p.upstream.Error(); err != nil {
Expand Down Expand Up @@ -507,13 +508,13 @@ func (p *processor) Tick(ctx cdcContext.Context) error {
p.metricProcessorTickDuration.Observe(costTime.Seconds())
p.refreshMetrics()

if err == nil {
return nil
}
return p.handleErr(err)
}

func (p *processor) handleErr(err error) error {
if err == nil {
return nil
}
if isProcessorIgnorableError(err) {
log.Info("processor exited",
zap.String("capture", p.captureInfo.ID),
Expand Down Expand Up @@ -547,7 +548,7 @@ func (p *processor) handleErr(err error) error {
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Error(err))
return cerror.ErrReactorFinished.GenWithStackByArgs()
return err
}

func (p *processor) tick(ctx cdcContext.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestProcessorError(t *testing.T) {
p.sendError(cerror.ErrSinkURIInvalid)
err = p.Tick(ctx)
tester.MustApplyPatches()
require.True(t, cerror.ErrReactorFinished.Equal(errors.Cause(err)))
require.Error(t, err)
require.Equal(t, p.changefeed.TaskPositions[p.captureInfo.ID], &model.TaskPosition{
Error: &model.RunningError{
Addr: "127.0.0.1:0000",
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestProcessorClose(t *testing.T) {
// send error
p.sendError(cerror.ErrSinkURIInvalid)
err = p.Tick(ctx)
require.True(t, cerror.ErrReactorFinished.Equal(errors.Cause(err)))
require.Error(t, err)
tester.MustApplyPatches()

require.Nil(t, p.Close())
Expand Down
22 changes: 19 additions & 3 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,26 @@ func (c *CDCEtcdClientImpl) PutCaptureInfo(
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}

// DeleteCaptureInfo delete capture info from etcd.
func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, id string) error {
key := GetEtcdKeyCaptureInfo(c.ClusterID, id)
// DeleteCaptureInfo delete all capture related info from etcd.
func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID string) error {
key := GetEtcdKeyCaptureInfo(c.ClusterID, captureID)
_, err := c.Client.Delete(ctx, key)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
// we need to clean all task position related to this capture when the capture is offline
// otherwise the task positions may leak
// FIXME (dongmen 2022.9.28): find a way to use changefeed's namespace
taskKey := TaskPositionKeyPrefix(c.ClusterID, model.DefaultNamespace)
// the taskKey format is /tidb/cdc/{clusterID}/{namespace}/task/position/{captureID}
taskKey = fmt.Sprintf("%s/%s", taskKey, captureID)
_, err = c.Client.Delete(ctx, taskKey, clientv3.WithPrefix())
if err != nil {
log.Warn("delete task position failed",
zap.String("clusterID", c.ClusterID),
zap.String("captureID", captureID),
zap.String("key", key), zap.Error(err))
}
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}

Expand Down
37 changes: 36 additions & 1 deletion pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func TestGetOwnerRevision(t *testing.T) {
wg sync.WaitGroup
)

// We will create 3 mock captures and they take turns to be the owner.
// We will create 3 mock captures, and they will become the owner one by one.
// While each is the owner, it tries to get its owner revision, and
// checks that the global monotonicity is guaranteed.

Expand Down Expand Up @@ -408,3 +408,38 @@ func TestMigrateBackupKey(t *testing.T) {
key = MigrateBackupKey(1, "abcdc")
require.Equal(t, "/tidb/cdc/__backup__/1/abcdc", key)
}

func TestDeleteCaptureInfo(t *testing.T) {
s := &Tester{}
s.SetUpTest(t)
defer s.TearDownTest(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
captureID := "test-capture-id"

changefeedStatus := map[model.ChangeFeedID]model.ChangeFeedStatus{
model.DefaultChangeFeedID("test-cf-1"): {ResolvedTs: 1},
}

for id, status := range changefeedStatus {
val, err := status.Marshal()
require.NoError(t, err)
statusKey := fmt.Sprintf("%s/%s", ChangefeedStatusKeyPrefix(DefaultCDCClusterID, id.Namespace), id.ID)
_, err = s.client.Client.Put(ctx, statusKey, val)
require.NoError(t, err)

_, err = s.client.Client.Put(
ctx, GetEtcdKeyTaskPosition(DefaultCDCClusterID, id, captureID),
fmt.Sprintf("task-%s", id.ID))
require.NoError(t, err)
}
err := s.client.DeleteCaptureInfo(ctx, captureID)
require.NoError(t, err)
for id := range changefeedStatus {
taskPositionKey := GetEtcdKeyTaskPosition(DefaultCDCClusterID, id, captureID)
v, err := s.client.Client.Get(ctx, taskPositionKey)
require.NoError(t, err)
require.Equal(t, 0, len(v.Kvs))
}
}
3 changes: 2 additions & 1 deletion pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
} else {
if exiting {
// If exiting is true here, it means that the reactor returned `ErrReactorFinished` last tick, and all pending patches is applied.
// If exiting is true here, it means that the reactor returned `ErrReactorFinished` last tick,
// and all pending patches is applied.
return nil
}
if worker.revision < worker.barrierRev {
Expand Down

0 comments on commit 67524d6

Please sign in to comment.