Skip to content

Commit cd41921

Browse files
authored
refine barrier (#342)
1 parent cd2da17 commit cd41921

8 files changed

+590
-414
lines changed

coordinator/coordinator.go

+2-33
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ import (
1919
"sync"
2020
"time"
2121

22-
"github.com/flowbehappy/tigate/pkg/common"
23-
"github.com/flowbehappy/tigate/pkg/node"
24-
2522
"github.com/flowbehappy/tigate/heartbeatpb"
23+
"github.com/flowbehappy/tigate/pkg/common"
2624
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
2725
"github.com/flowbehappy/tigate/pkg/messaging"
2826
"github.com/flowbehappy/tigate/pkg/metrics"
27+
"github.com/flowbehappy/tigate/pkg/node"
2928
"github.com/flowbehappy/tigate/scheduler"
3029
"github.com/pingcap/log"
3130
"github.com/pingcap/tiflow/cdc/model"
@@ -145,7 +144,6 @@ func (c *coordinator) Tick(
145144
// 5. send saved checkpoint ts to maintainer
146145
c.sendSavedCheckpointTsToMaintainer()
147146

148-
c.printStatus()
149147
return state, nil
150148
}
151149

@@ -425,32 +423,3 @@ func (c *coordinator) sendSavedCheckpointTsToMaintainer() {
425423
}
426424
}
427425
}
428-
429-
func (c *coordinator) printStatus() {
430-
if time.Since(c.lastCheckTime) > time.Second*10 {
431-
workingTask := 0
432-
absentTask := 0
433-
commitTask := 0
434-
removingTask := 0
435-
for _, value := range c.supervisor.StateMachines {
436-
switch value.State {
437-
case scheduler.SchedulerStatusAbsent:
438-
absentTask++
439-
case scheduler.SchedulerStatusCommiting:
440-
commitTask++
441-
case scheduler.SchedulerStatusWorking:
442-
workingTask++
443-
case scheduler.SchedulerStatusRemoving:
444-
removingTask++
445-
}
446-
}
447-
log.Info("changefeed status",
448-
zap.Int("absent", absentTask),
449-
zap.Int("commit", commitTask),
450-
zap.Int("working", workingTask),
451-
zap.Int("removing", removingTask),
452-
zap.Any("runningTask", len(c.supervisor.RunningTasks)),
453-
)
454-
c.lastCheckTime = time.Now()
455-
}
456-
}

0 commit comments

Comments
 (0)