Skip to content

Commit b7bbfc3

Browse files
committed
move the remove logic to operator_controller.go
2 parents 52cd45c + 5d8bf8a commit b7bbfc3

File tree

5 files changed

+8
-13
lines changed

5 files changed

+8
-13
lines changed

maintainer/operator/operator_add.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,6 @@ func (m *AddDispatcherOperator) PostFinish() {
9494
}
9595

9696
func (m *AddDispatcherOperator) String() string {
97-
return fmt.Sprintf("add dispatcher operator: %s, dest:%s ",
97+
return fmt.Sprintf("add dispatcher operator: %s, dest:%s",
9898
m.replicaSet.ID, m.dest)
9999
}

maintainer/operator/operator_controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (oc *Controller) AddOperator(op Operator) bool {
126126
}
127127
span := oc.replicationDB.GetTaskByID(op.ID())
128128
if span == nil {
129-
log.Warn("span not found",
129+
log.Warn("add operator failed, span not found",
130130
zap.String("changefeed", oc.changefeedID),
131131
zap.String("operator", op.String()))
132132
return false

maintainer/operator/operator_move.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,6 @@ func (m *MoveDispatcherOperator) String() string {
154154
m.lck.Lock()
155155
defer m.lck.Unlock()
156156

157-
return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s ",
157+
return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s",
158158
m.replicaSet.ID, m.origin, m.dest)
159159
}

maintainer/operator/operator_split.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,6 @@ func (m *SplitDispatcherOperator) Start() {
8383

8484
func (m *SplitDispatcherOperator) String() string {
8585
// todo add split region span
86-
return fmt.Sprintf("move dispatcher operator: %s, dest:%v ",
86+
return fmt.Sprintf("move dispatcher operator: %s, splitSpans:%v",
8787
m.replicaSet.ID, m.splitSpans)
8888
}

maintainer/replica/span_replication.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ func NewWorkingReplicaSet(
9292
return r
9393
}
9494

95-
func (r *SpanReplication) UpdateStatus(status any) {
96-
if status != nil {
97-
newStatus := status.(*heartbeatpb.TableSpanStatus)
95+
func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) {
96+
if newStatus != nil {
9897
if newStatus.CheckpointTs >= r.status.CheckpointTs {
9998
r.status = newStatus
10099
}
@@ -124,12 +123,8 @@ func (r *SpanReplication) NewAddInferiorMessage(server node.ID) *messaging.Targe
124123
ChangefeedID: r.ChangefeedID.ID,
125124
Config: &heartbeatpb.DispatcherConfig{
126125
DispatcherID: r.ID.ToPB(),
127-
Span: &heartbeatpb.TableSpan{
128-
TableID: r.Span.TableID,
129-
StartKey: r.Span.StartKey,
130-
EndKey: r.Span.EndKey,
131-
},
132-
StartTs: r.status.CheckpointTs,
126+
Span: r.Span,
127+
StartTs: r.status.CheckpointTs,
133128
},
134129
ScheduleAction: heartbeatpb.ScheduleAction_Create,
135130
})

0 commit comments

Comments
 (0)