Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the remove dispatcher logic to op controller #369

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
@@ -48,11 +48,17 @@ import (
"go.uber.org/zap"
)

// Maintainer is response for handle changefeed replication tasks, Maintainer should:
// 1. schedules tables to ticdc watcher
// Maintainer is response for handle changefeed replication tasksMaintainer should:
// 1. schedules tables to dispatcher manager
// 2. calculate changefeed checkpoint ts
// 3. send changefeed status to coordinator
// 4. handle heartbeat reported by dispatcher
// there are four threads in maintainer:
// 1. controller thread , handled in dynstream, it handles the main logic of the maintainer, like barrier, heartbeat
// 2. scheduler thread, handled in threadpool, it schedules the tables to dispatcher manager
// 3. operator controller thread, handled in threadpool, it runs the operators
// 4. checker controller, handled in threadpool, it runs the checkers to dynamically adjust the schedule
// all threads are read/write information from/to the ReplicationDB
type Maintainer struct {
id model.ChangeFeedID
config *config.ChangeFeedInfo
80 changes: 40 additions & 40 deletions maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ import (
)

// Controller schedules and balance tables
// there are 3 main components in the controller, scheduler, ReplicationDB and operator controller
type Controller struct {
// initialTables hold all tables that before controller bootstrapped
initialTables []commonEvent.Table
@@ -93,6 +94,37 @@ func NewController(changefeedID string,
return s
}

// HandleStatus handle the status report from the node
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
stm := c.GetTask(dispatcherID)
if stm == nil {
log.Warn("no span found, ignore",
zap.String("changefeed", c.changefeedID),
zap.String("from", from.String()),
zap.Any("status", status),
zap.String("span", dispatcherID.String()))
if status.ComponentStatus == heartbeatpb.ComponentState_Working {
// if the span is not found, and the status is working, we need to remove it from dispatcher
_ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID))
}
continue
}
c.operatorController.UpdateOperatorStatus(dispatcherID, from, status)
nodeID := stm.GetNodeID()
if nodeID != from {
// todo: handle the case that the node id is mismatch
log.Warn("node id not match",
zap.String("changefeed", c.changefeedID),
zap.Any("from", from),
zap.Stringer("node", nodeID))
continue
}
stm.UpdateStatus(status)
}
}

func (c *Controller) GetTasksBySchemaID(schemaID int64) []*replica.SpanReplication {
return c.replicationDB.GetTasksBySchemaID(schemaID)
}
@@ -207,25 +239,22 @@ func (c *Controller) GetTask(dispatcherID common.DispatcherID) *replica.SpanRepl
return c.replicationDB.GetTaskByID(dispatcherID)
}

// RemoveAllTasks remove all tasks, todo: move these 3 methods to the operator controller
// RemoveAllTasks remove all tasks
func (c *Controller) RemoveAllTasks() {
for _, replicaSet := range c.replicationDB.TryRemoveAll() {
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
}
c.operatorController.RemoveAllTasks()
}

// RemoveTasksBySchemaID remove all tasks by schema id
func (c *Controller) RemoveTasksBySchemaID(schemaID int64) {
for _, replicaSet := range c.replicationDB.TryRemoveBySchemaID(schemaID) {
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
}
c.operatorController.RemoveTasksBySchemaID(schemaID)
}

// RemoveTasksByTableIDs remove all tasks by table id
func (c *Controller) RemoveTasksByTableIDs(tables ...int64) {
for _, replicaSet := range c.replicationDB.TryRemoveByTableIDs(tables...) {
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
}
c.operatorController.RemoveTasksByTableIDs(tables...)
}

// GetTasksByTableIDs get all tasks by table id
func (c *Controller) GetTasksByTableIDs(tableIDs ...int64) []*replica.SpanReplication {
return c.replicationDB.GetTasksByTableIDs(tableIDs...)
}
@@ -236,6 +265,7 @@ func (c *Controller) UpdateSchemaID(tableID, newSchemaID int64) {
c.replicationDB.UpdateSchemaID(tableID, newSchemaID)
}

// RemoveNode is called when a node is removed
func (c *Controller) RemoveNode(id node.ID) {
c.operatorController.OnNodeRemoved(id)
}
@@ -245,36 +275,6 @@ func (c *Controller) ScheduleFinished() bool {
return c.replicationDB.GetAbsentSize() == 0 && c.operatorController.OperatorSize() == 0
}

func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
stm := c.GetTask(dispatcherID)
if stm == nil {
log.Warn("no span found, ignore",
zap.String("changefeed", c.changefeedID),
zap.String("from", from.String()),
zap.Any("status", status),
zap.String("span", dispatcherID.String()))
if status.ComponentStatus == heartbeatpb.ComponentState_Working {
// if the span is not found, and the status is working, we need to remove it from dispatcher
_ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID))
}
continue
}
c.operatorController.UpdateOperatorStatus(dispatcherID, from, status)
nodeID := stm.GetNodeID()
if nodeID != from {
// todo: handle the case that the node id is mismatch
log.Warn("node id not match",
zap.String("changefeed", c.changefeedID),
zap.Any("from", from),
zap.Stringer("node", nodeID))
continue
}
stm.UpdateStatus(status)
}
}

func (c *Controller) TaskSize() int {
return c.replicationDB.TaskSize()
}
2 changes: 1 addition & 1 deletion maintainer/operator/operator_add.go
Original file line number Diff line number Diff line change
@@ -94,6 +94,6 @@ func (m *AddDispatcherOperator) PostFinish() {
}

func (m *AddDispatcherOperator) String() string {
return fmt.Sprintf("add dispatcher operator: %s, dest:%s ",
return fmt.Sprintf("add dispatcher operator: %s, dest:%s",
m.replicaSet.ID, m.dest)
}
54 changes: 42 additions & 12 deletions maintainer/operator/operator_controller.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,8 @@ import (
"go.uber.org/zap"
)

// Controller is the operator controller, it manages all operators.
// And the Controller is responsible for the execution of the operator.
type Controller struct {
changefeedID string
replicationDB *replica.ReplicationDB
@@ -80,21 +82,35 @@ func (oc *Controller) Execute() time.Time {
}
}

// ReplicaSetRemoved if the replica set is removed,
// the controller will remove the operator. add a new operator to the controller.
func (oc *Controller) ReplicaSetRemoved(op *RemoveDispatcherOperator) {
// RemoveAllTasks remove all tasks, and notify all operators to stop.
// it is only called by the barrier when the changefeed is stopped.
func (oc *Controller) RemoveAllTasks() {
oc.lock.Lock()
defer oc.lock.Unlock()

if old, ok := oc.operators[op.ID()]; ok {
log.Info("replica set is removed , replace the old one",
zap.String("changefeed", oc.changefeedID),
zap.String("replicaset", op.ID().String()),
zap.String("operator", op.String()))
old.OnTaskRemoved()
delete(oc.operators, op.ID())
for _, replicaSet := range oc.replicationDB.TryRemoveAll() {
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
}
}

// RemoveTasksBySchemaID remove all tasks by schema id.
// it is only by the barrier when the schema is dropped by ddl
func (oc *Controller) RemoveTasksBySchemaID(schemaID int64) {
oc.lock.Lock()
defer oc.lock.Unlock()
for _, replicaSet := range oc.replicationDB.TryRemoveBySchemaID(schemaID) {
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
}
}

// RemoveTasksByTableIDs remove all tasks by table ids.
// it is only called by the barrier when the table is dropped by ddl
func (oc *Controller) RemoveTasksByTableIDs(tables ...int64) {
oc.lock.Lock()
defer oc.lock.Unlock()
for _, replicaSet := range oc.replicationDB.TryRemoveByTableIDs(tables...) {
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
}
oc.operators[op.ID()] = op
}

// AddOperator adds an operator to the controller, if the operator already exists, return false.
@@ -110,7 +126,7 @@ func (oc *Controller) AddOperator(op Operator) bool {
}
span := oc.replicationDB.GetTaskByID(op.ID())
if span == nil {
log.Warn("span not found",
log.Warn("add operator failed, span not found",
zap.String("changefeed", oc.changefeedID),
zap.String("operator", op.String()))
return false
@@ -198,3 +214,17 @@ func (oc *Controller) pollQueueingOperator() (Operator, bool) {
heap.Push(&oc.runningQueue, item)
return op, true
}

// ReplicaSetRemoved if the replica set is removed,
// the controller will remove the operator. add a new operator to the controller.
func (oc *Controller) removeReplicaSet(op *RemoveDispatcherOperator) {
if old, ok := oc.operators[op.ID()]; ok {
log.Info("replica set is removed , replace the old one",
zap.String("changefeed", oc.changefeedID),
zap.String("replicaset", op.ID().String()),
zap.String("operator", op.String()))
old.OnTaskRemoved()
delete(oc.operators, op.ID())
}
oc.operators[op.ID()] = op
}
2 changes: 1 addition & 1 deletion maintainer/operator/operator_move.go
Original file line number Diff line number Diff line change
@@ -154,6 +154,6 @@ func (m *MoveDispatcherOperator) String() string {
m.lck.Lock()
defer m.lck.Unlock()

return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s ",
return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s",
m.replicaSet.ID, m.origin, m.dest)
}
2 changes: 1 addition & 1 deletion maintainer/operator/operator_split.go
Original file line number Diff line number Diff line change
@@ -83,6 +83,6 @@ func (m *SplitDispatcherOperator) Start() {

func (m *SplitDispatcherOperator) String() string {
// todo add split region span
return fmt.Sprintf("move dispatcher operator: %s, dest:%v ",
return fmt.Sprintf("move dispatcher operator: %s, splitSpans:%v",
m.replicaSet.ID, m.splitSpans)
}
13 changes: 4 additions & 9 deletions maintainer/replica/span_replication.go
Original file line number Diff line number Diff line change
@@ -92,9 +92,8 @@ func NewWorkingReplicaSet(
return r
}

func (r *SpanReplication) UpdateStatus(status any) {
if status != nil {
newStatus := status.(*heartbeatpb.TableSpanStatus)
func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) {
if newStatus != nil {
if newStatus.CheckpointTs >= r.status.CheckpointTs {
r.status = newStatus
}
@@ -124,12 +123,8 @@ func (r *SpanReplication) NewAddInferiorMessage(server node.ID) *messaging.Targe
ChangefeedID: r.ChangefeedID.ID,
Config: &heartbeatpb.DispatcherConfig{
DispatcherID: r.ID.ToPB(),
Span: &heartbeatpb.TableSpan{
TableID: r.Span.TableID,
StartKey: r.Span.StartKey,
EndKey: r.Span.EndKey,
},
StartTs: r.status.CheckpointTs,
Span: r.Span,
StartTs: r.status.CheckpointTs,
},
ScheduleAction: heartbeatpb.ScheduleAction_Create,
})