Skip to content

Commit 02ea496

Browse files
authored
Move the remove dispatcher logic to op controller (#369)
* update some filed name * move the remove logic to operator_controller.go
1 parent 5d8bf8a commit 02ea496

7 files changed

+97
-66
lines changed

maintainer/maintainer.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,17 @@ import (
4848
"go.uber.org/zap"
4949
)
5050

51-
// Maintainer is response for handle changefeed replication tasks, Maintainer should:
52-
// 1. schedules tables to ticdc watcher
51+
// Maintainer is response for handle changefeed replication tasksMaintainer should:
52+
// 1. schedules tables to dispatcher manager
5353
// 2. calculate changefeed checkpoint ts
5454
// 3. send changefeed status to coordinator
5555
// 4. handle heartbeat reported by dispatcher
56+
// there are four threads in maintainer:
57+
// 1. controller thread , handled in dynstream, it handles the main logic of the maintainer, like barrier, heartbeat
58+
// 2. scheduler thread, handled in threadpool, it schedules the tables to dispatcher manager
59+
// 3. operator controller thread, handled in threadpool, it runs the operators
60+
// 4. checker controller, handled in threadpool, it runs the checkers to dynamically adjust the schedule
61+
// all threads are read/write information from/to the ReplicationDB
5662
type Maintainer struct {
5763
id model.ChangeFeedID
5864
config *config.ChangeFeedInfo

maintainer/maintainer_controller.go

+40-40
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
)
4040

4141
// Controller schedules and balance tables
42+
// there are 3 main components in the controller, scheduler, ReplicationDB and operator controller
4243
type Controller struct {
4344
// initialTables hold all tables that before controller bootstrapped
4445
initialTables []commonEvent.Table
@@ -93,6 +94,37 @@ func NewController(changefeedID string,
9394
return s
9495
}
9596

97+
// HandleStatus handle the status report from the node
98+
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
99+
for _, status := range statusList {
100+
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
101+
stm := c.GetTask(dispatcherID)
102+
if stm == nil {
103+
log.Warn("no span found, ignore",
104+
zap.String("changefeed", c.changefeedID),
105+
zap.String("from", from.String()),
106+
zap.Any("status", status),
107+
zap.String("span", dispatcherID.String()))
108+
if status.ComponentStatus == heartbeatpb.ComponentState_Working {
109+
// if the span is not found, and the status is working, we need to remove it from dispatcher
110+
_ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID))
111+
}
112+
continue
113+
}
114+
c.operatorController.UpdateOperatorStatus(dispatcherID, from, status)
115+
nodeID := stm.GetNodeID()
116+
if nodeID != from {
117+
// todo: handle the case that the node id is mismatch
118+
log.Warn("node id not match",
119+
zap.String("changefeed", c.changefeedID),
120+
zap.Any("from", from),
121+
zap.Stringer("node", nodeID))
122+
continue
123+
}
124+
stm.UpdateStatus(status)
125+
}
126+
}
127+
96128
func (c *Controller) GetTasksBySchemaID(schemaID int64) []*replica.SpanReplication {
97129
return c.replicationDB.GetTasksBySchemaID(schemaID)
98130
}
@@ -207,25 +239,22 @@ func (c *Controller) GetTask(dispatcherID common.DispatcherID) *replica.SpanRepl
207239
return c.replicationDB.GetTaskByID(dispatcherID)
208240
}
209241

210-
// RemoveAllTasks remove all tasks, todo: move these 3 methods to the operator controller
242+
// RemoveAllTasks remove all tasks
211243
func (c *Controller) RemoveAllTasks() {
212-
for _, replicaSet := range c.replicationDB.TryRemoveAll() {
213-
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
214-
}
244+
c.operatorController.RemoveAllTasks()
215245
}
216246

247+
// RemoveTasksBySchemaID remove all tasks by schema id
217248
func (c *Controller) RemoveTasksBySchemaID(schemaID int64) {
218-
for _, replicaSet := range c.replicationDB.TryRemoveBySchemaID(schemaID) {
219-
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
220-
}
249+
c.operatorController.RemoveTasksBySchemaID(schemaID)
221250
}
222251

252+
// RemoveTasksByTableIDs remove all tasks by table id
223253
func (c *Controller) RemoveTasksByTableIDs(tables ...int64) {
224-
for _, replicaSet := range c.replicationDB.TryRemoveByTableIDs(tables...) {
225-
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
226-
}
254+
c.operatorController.RemoveTasksByTableIDs(tables...)
227255
}
228256

257+
// GetTasksByTableIDs get all tasks by table id
229258
func (c *Controller) GetTasksByTableIDs(tableIDs ...int64) []*replica.SpanReplication {
230259
return c.replicationDB.GetTasksByTableIDs(tableIDs...)
231260
}
@@ -236,6 +265,7 @@ func (c *Controller) UpdateSchemaID(tableID, newSchemaID int64) {
236265
c.replicationDB.UpdateSchemaID(tableID, newSchemaID)
237266
}
238267

268+
// RemoveNode is called when a node is removed
239269
func (c *Controller) RemoveNode(id node.ID) {
240270
c.operatorController.OnNodeRemoved(id)
241271
}
@@ -245,36 +275,6 @@ func (c *Controller) ScheduleFinished() bool {
245275
return c.replicationDB.GetAbsentSize() == 0 && c.operatorController.OperatorSize() == 0
246276
}
247277

248-
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
249-
for _, status := range statusList {
250-
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
251-
stm := c.GetTask(dispatcherID)
252-
if stm == nil {
253-
log.Warn("no span found, ignore",
254-
zap.String("changefeed", c.changefeedID),
255-
zap.String("from", from.String()),
256-
zap.Any("status", status),
257-
zap.String("span", dispatcherID.String()))
258-
if status.ComponentStatus == heartbeatpb.ComponentState_Working {
259-
// if the span is not found, and the status is working, we need to remove it from dispatcher
260-
_ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID))
261-
}
262-
continue
263-
}
264-
c.operatorController.UpdateOperatorStatus(dispatcherID, from, status)
265-
nodeID := stm.GetNodeID()
266-
if nodeID != from {
267-
// todo: handle the case that the node id is mismatch
268-
log.Warn("node id not match",
269-
zap.String("changefeed", c.changefeedID),
270-
zap.Any("from", from),
271-
zap.Stringer("node", nodeID))
272-
continue
273-
}
274-
stm.UpdateStatus(status)
275-
}
276-
}
277-
278278
func (c *Controller) TaskSize() int {
279279
return c.replicationDB.TaskSize()
280280
}

maintainer/operator/operator_add.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,6 @@ func (m *AddDispatcherOperator) PostFinish() {
9494
}
9595

9696
func (m *AddDispatcherOperator) String() string {
97-
return fmt.Sprintf("add dispatcher operator: %s, dest:%s ",
97+
return fmt.Sprintf("add dispatcher operator: %s, dest:%s",
9898
m.replicaSet.ID, m.dest)
9999
}

maintainer/operator/operator_controller.go

+42-12
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"go.uber.org/zap"
2828
)
2929

30+
// Controller is the operator controller, it manages all operators.
31+
// And the Controller is responsible for the execution of the operator.
3032
type Controller struct {
3133
changefeedID string
3234
replicationDB *replica.ReplicationDB
@@ -80,21 +82,35 @@ func (oc *Controller) Execute() time.Time {
8082
}
8183
}
8284

83-
// ReplicaSetRemoved if the replica set is removed,
84-
// the controller will remove the operator. add a new operator to the controller.
85-
func (oc *Controller) ReplicaSetRemoved(op *RemoveDispatcherOperator) {
85+
// RemoveAllTasks remove all tasks, and notify all operators to stop.
86+
// it is only called by the barrier when the changefeed is stopped.
87+
func (oc *Controller) RemoveAllTasks() {
8688
oc.lock.Lock()
8789
defer oc.lock.Unlock()
8890

89-
if old, ok := oc.operators[op.ID()]; ok {
90-
log.Info("replica set is removed , replace the old one",
91-
zap.String("changefeed", oc.changefeedID),
92-
zap.String("replicaset", op.ID().String()),
93-
zap.String("operator", op.String()))
94-
old.OnTaskRemoved()
95-
delete(oc.operators, op.ID())
91+
for _, replicaSet := range oc.replicationDB.TryRemoveAll() {
92+
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
93+
}
94+
}
95+
96+
// RemoveTasksBySchemaID remove all tasks by schema id.
97+
// it is only by the barrier when the schema is dropped by ddl
98+
func (oc *Controller) RemoveTasksBySchemaID(schemaID int64) {
99+
oc.lock.Lock()
100+
defer oc.lock.Unlock()
101+
for _, replicaSet := range oc.replicationDB.TryRemoveBySchemaID(schemaID) {
102+
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
103+
}
104+
}
105+
106+
// RemoveTasksByTableIDs remove all tasks by table ids.
107+
// it is only called by the barrier when the table is dropped by ddl
108+
func (oc *Controller) RemoveTasksByTableIDs(tables ...int64) {
109+
oc.lock.Lock()
110+
defer oc.lock.Unlock()
111+
for _, replicaSet := range oc.replicationDB.TryRemoveByTableIDs(tables...) {
112+
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
96113
}
97-
oc.operators[op.ID()] = op
98114
}
99115

100116
// AddOperator adds an operator to the controller, if the operator already exists, return false.
@@ -110,7 +126,7 @@ func (oc *Controller) AddOperator(op Operator) bool {
110126
}
111127
span := oc.replicationDB.GetTaskByID(op.ID())
112128
if span == nil {
113-
log.Warn("span not found",
129+
log.Warn("add operator failed, span not found",
114130
zap.String("changefeed", oc.changefeedID),
115131
zap.String("operator", op.String()))
116132
return false
@@ -198,3 +214,17 @@ func (oc *Controller) pollQueueingOperator() (Operator, bool) {
198214
heap.Push(&oc.runningQueue, item)
199215
return op, true
200216
}
217+
218+
// ReplicaSetRemoved if the replica set is removed,
219+
// the controller will remove the operator. add a new operator to the controller.
220+
func (oc *Controller) removeReplicaSet(op *RemoveDispatcherOperator) {
221+
if old, ok := oc.operators[op.ID()]; ok {
222+
log.Info("replica set is removed , replace the old one",
223+
zap.String("changefeed", oc.changefeedID),
224+
zap.String("replicaset", op.ID().String()),
225+
zap.String("operator", op.String()))
226+
old.OnTaskRemoved()
227+
delete(oc.operators, op.ID())
228+
}
229+
oc.operators[op.ID()] = op
230+
}

maintainer/operator/operator_move.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,6 @@ func (m *MoveDispatcherOperator) String() string {
154154
m.lck.Lock()
155155
defer m.lck.Unlock()
156156

157-
return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s ",
157+
return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s",
158158
m.replicaSet.ID, m.origin, m.dest)
159159
}

maintainer/operator/operator_split.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,6 @@ func (m *SplitDispatcherOperator) Start() {
8383

8484
func (m *SplitDispatcherOperator) String() string {
8585
// todo add split region span
86-
return fmt.Sprintf("move dispatcher operator: %s, dest:%v ",
86+
return fmt.Sprintf("move dispatcher operator: %s, splitSpans:%v",
8787
m.replicaSet.ID, m.splitSpans)
8888
}

maintainer/replica/span_replication.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ func NewWorkingReplicaSet(
9292
return r
9393
}
9494

95-
func (r *SpanReplication) UpdateStatus(status any) {
96-
if status != nil {
97-
newStatus := status.(*heartbeatpb.TableSpanStatus)
95+
func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) {
96+
if newStatus != nil {
9897
if newStatus.CheckpointTs >= r.status.CheckpointTs {
9998
r.status = newStatus
10099
}
@@ -124,12 +123,8 @@ func (r *SpanReplication) NewAddInferiorMessage(server node.ID) *messaging.Targe
124123
ChangefeedID: r.ChangefeedID.ID,
125124
Config: &heartbeatpb.DispatcherConfig{
126125
DispatcherID: r.ID.ToPB(),
127-
Span: &heartbeatpb.TableSpan{
128-
TableID: r.Span.TableID,
129-
StartKey: r.Span.StartKey,
130-
EndKey: r.Span.EndKey,
131-
},
132-
StartTs: r.status.CheckpointTs,
126+
Span: r.Span,
127+
StartTs: r.status.CheckpointTs,
133128
},
134129
ScheduleAction: heartbeatpb.ScheduleAction_Create,
135130
})

0 commit comments

Comments
 (0)