Skip to content

Commit 5d8bf8a

Browse files
authored
update some filed name (#368)
1 parent 0fe6b9d commit 5d8bf8a

11 files changed

+200
-183
lines changed

maintainer/barrier_event_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestScheduleEvent(t *testing.T) {
4242
})
4343
event.scheduleBlockEvent()
4444
//drop table will be executed first
45-
require.Equal(t, 2, controller.db.GetAbsentSize())
45+
require.Equal(t, 2, controller.replicationDB.GetAbsentSize())
4646

4747
event = NewBlockEvent("test", controller, &heartbeatpb.State{
4848
IsBlocked: true,
@@ -55,7 +55,7 @@ func TestScheduleEvent(t *testing.T) {
5555
})
5656
event.scheduleBlockEvent()
5757
//drop table will be executed first, then add the new table
58-
require.Equal(t, 1, controller.db.GetAbsentSize())
58+
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
5959

6060
event = NewBlockEvent("test", controller, &heartbeatpb.State{
6161
IsBlocked: true,
@@ -68,7 +68,7 @@ func TestScheduleEvent(t *testing.T) {
6868
})
6969
event.scheduleBlockEvent()
7070
//drop table will be executed first, then add the new table
71-
require.Equal(t, 1, controller.db.GetAbsentSize())
71+
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
7272
}
7373

7474
func TestResendAction(t *testing.T) {
@@ -79,10 +79,10 @@ func TestResendAction(t *testing.T) {
7979
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
8080
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1)
8181
var dispatcherIDs []common.DispatcherID
82-
absents, _ := controller.db.GetScheduleSate(make([]*replica.SpanReplication, 0), 100)
82+
absents, _ := controller.replicationDB.GetScheduleSate(make([]*replica.SpanReplication, 0), 100)
8383
for _, stm := range absents {
84-
controller.db.BindSpanToNode("", "node1", stm)
85-
controller.db.MarkSpanReplicating(stm)
84+
controller.replicationDB.BindSpanToNode("", "node1", stm)
85+
controller.replicationDB.MarkSpanReplicating(stm)
8686
dispatcherIDs = append(dispatcherIDs, stm.ID)
8787
}
8888
event := NewBlockEvent("test", controller, &heartbeatpb.State{
@@ -172,7 +172,7 @@ func TestUpdateSchemaID(t *testing.T) {
172172
setNodeManagerAndMessageCenter()
173173
controller := NewController("test", 1, nil, nil, nil, nil, 1000, 0)
174174
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
175-
require.Equal(t, 1, controller.db.GetAbsentSize())
175+
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
176176
require.Len(t, controller.GetTasksBySchemaID(1), 1)
177177
event := NewBlockEvent("test", controller, &heartbeatpb.State{
178178
IsBlocked: true,
@@ -189,7 +189,7 @@ func TestUpdateSchemaID(t *testing.T) {
189189
}},
190190
)
191191
event.scheduleBlockEvent()
192-
require.Equal(t, 1, controller.db.GetAbsentSize())
192+
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
193193
// check the schema id and map is updated
194194
require.Len(t, controller.GetTasksBySchemaID(1), 0)
195195
require.Len(t, controller.GetTasksBySchemaID(2), 1)

maintainer/barrier_test.go

+23-23
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ func TestOneBlockEvent(t *testing.T) {
3333
controller := NewController("test", 1, nil, nil, nil, nil, 1000, 0)
3434
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 0)
3535
stm := controller.GetTasksByTableIDs(1)[0]
36-
controller.db.BindSpanToNode("", "node1", stm)
37-
controller.db.MarkSpanReplicating(stm)
36+
controller.replicationDB.BindSpanToNode("", "node1", stm)
37+
controller.replicationDB.MarkSpanReplicating(stm)
3838
barrier := NewBarrier(controller)
3939
msg := barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{
4040
ChangefeedID: "test",
@@ -101,14 +101,14 @@ func TestNormalBlock(t *testing.T) {
101101
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: int64(id)}, 0)
102102
stm := controller.GetTasksByTableIDs(int64(id))[0]
103103
blockedDispatcherIDS = append(blockedDispatcherIDS, stm.ID.ToPB())
104-
controller.db.BindSpanToNode("", "node1", stm)
105-
controller.db.MarkSpanReplicating(stm)
104+
controller.replicationDB.BindSpanToNode("", "node1", stm)
105+
controller.replicationDB.MarkSpanReplicating(stm)
106106
}
107107

108108
// the last one is the writer
109109
var selectDispatcherID = common.NewDispatcherIDFromPB(blockedDispatcherIDS[2])
110110
selectedRep := controller.GetTask(selectDispatcherID)
111-
controller.db.BindSpanToNode("node1", "node2", selectedRep)
111+
controller.replicationDB.BindSpanToNode("node1", "node2", selectedRep)
112112
dropID := selectedRep.Span.TableID
113113

114114
newSpan := &heartbeatpb.Table{TableID: 10, SchemaID: 1}
@@ -284,13 +284,13 @@ func TestSchemaBlock(t *testing.T) {
284284
controller.AddNewTable(commonEvent.Table{SchemaID: 2, TableID: 3}, 1)
285285
var dispatcherIDs []*heartbeatpb.DispatcherID
286286
var dropTables = []int64{1, 2}
287-
absents, _ := controller.db.GetScheduleSate(make([]*replica.SpanReplication, 0), 100)
287+
absents, _ := controller.replicationDB.GetScheduleSate(make([]*replica.SpanReplication, 0), 100)
288288
for _, stm := range absents {
289-
if stm.SchemaID == 1 {
289+
if stm.GetSchemaID() == 1 {
290290
dispatcherIDs = append(dispatcherIDs, stm.ID.ToPB())
291291
}
292-
controller.db.BindSpanToNode("", "node1", stm)
293-
controller.db.MarkSpanReplicating(stm)
292+
controller.replicationDB.BindSpanToNode("", "node1", stm)
293+
controller.replicationDB.MarkSpanReplicating(stm)
294294
}
295295

296296
newTable := &heartbeatpb.Table{TableID: 10, SchemaID: 2}
@@ -412,17 +412,17 @@ func TestSchemaBlock(t *testing.T) {
412412
require.Len(t, barrier.blockedTs, 1)
413413
// the writer already advanced
414414
require.Len(t, event.reportedDispatchers, 1)
415-
require.Equal(t, 1, controller.db.GetAbsentSize())
416-
require.Equal(t, 2, controller.oc.OperatorSize())
415+
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
416+
require.Equal(t, 2, controller.operatorController.OperatorSize())
417417
// two dispatcher and moved to operator queue, operator will be removed after ack
418-
require.Equal(t, 3, controller.db.GetReplicatingSize())
419-
for _, task := range controller.db.GetReplicating() {
420-
op := controller.oc.GetOperator(task.ID)
418+
require.Equal(t, 3, controller.replicationDB.GetReplicatingSize())
419+
for _, task := range controller.replicationDB.GetReplicating() {
420+
op := controller.operatorController.GetOperator(task.ID)
421421
if op != nil {
422422
op.PostFinish()
423423
}
424424
}
425-
require.Equal(t, 1, controller.db.GetReplicatingSize())
425+
require.Equal(t, 1, controller.replicationDB.GetReplicatingSize())
426426

427427
// other dispatcher advanced checkpoint ts
428428
msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{
@@ -455,15 +455,15 @@ func TestSyncPointBlock(t *testing.T) {
455455
controller.AddNewTable(commonEvent.Table{SchemaID: 2, TableID: 3}, 1)
456456
var dispatcherIDs []*heartbeatpb.DispatcherID
457457
var dropTables = []int64{1, 2, 3}
458-
absents, _ := controller.db.GetScheduleSate(make([]*replica.SpanReplication, 0), 10000)
458+
absents, _ := controller.replicationDB.GetScheduleSate(make([]*replica.SpanReplication, 0), 10000)
459459
for _, stm := range absents {
460460
dispatcherIDs = append(dispatcherIDs, stm.ID.ToPB())
461-
controller.db.BindSpanToNode("", "node1", stm)
462-
controller.db.MarkSpanReplicating(stm)
461+
controller.replicationDB.BindSpanToNode("", "node1", stm)
462+
controller.replicationDB.MarkSpanReplicating(stm)
463463
}
464464
var selectDispatcherID = common.NewDispatcherIDFromPB(dispatcherIDs[2])
465465
selectedRep := controller.GetTask(selectDispatcherID)
466-
controller.db.BindSpanToNode("node1", "node2", selectedRep)
466+
controller.replicationDB.BindSpanToNode("node1", "node2", selectedRep)
467467

468468
newSpan := &heartbeatpb.Table{TableID: 10, SchemaID: 2}
469469
barrier := NewBarrier(controller)
@@ -631,7 +631,7 @@ func TestNonBlocked(t *testing.T) {
631631
require.True(t, heartbeatpb.InfluenceType_Normal == resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType)
632632
require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.DispatcherIDs[0], blockedDispatcherIDS[0])
633633
require.Len(t, barrier.blockedTs, 0)
634-
require.Equal(t, 2, barrier.controller.db.GetAbsentSize(), 2)
634+
require.Equal(t, 2, barrier.controller.replicationDB.GetAbsentSize(), 2)
635635
}
636636

637637
func TestSyncPointBlockPerf(t *testing.T) {
@@ -642,10 +642,10 @@ func TestSyncPointBlockPerf(t *testing.T) {
642642
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: int64(id)}, 1)
643643
}
644644
var dispatcherIDs []*heartbeatpb.DispatcherID
645-
absent, _ := controller.db.GetScheduleSate(make([]*replica.SpanReplication, 0), 10000)
645+
absent, _ := controller.replicationDB.GetScheduleSate(make([]*replica.SpanReplication, 0), 10000)
646646
for _, stm := range absent {
647-
controller.db.BindSpanToNode("", "node1", stm)
648-
controller.db.MarkSpanReplicating(stm)
647+
controller.replicationDB.BindSpanToNode("", "node1", stm)
648+
controller.replicationDB.MarkSpanReplicating(stm)
649649
dispatcherIDs = append(dispatcherIDs, stm.ID.ToPB())
650650
}
651651
var blockStatus []*heartbeatpb.TableSpanBlockStatus

maintainer/maintainer.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (m *Maintainer) onInit() bool {
304304
func (m *Maintainer) onMessage(msg *messaging.TargetMessage) error {
305305
switch msg.Type {
306306
case messaging.TypeHeartBeatRequest:
307-
return m.onHeartBeatRequest(msg)
307+
m.onHeartBeatRequest(msg)
308308
case messaging.TypeBlockStatusRequest:
309309
m.onBlockStateRequest(msg)
310310
case messaging.TypeMaintainerBootstrapResponse:
@@ -436,7 +436,7 @@ func (m *Maintainer) sendMessages(msgs []*messaging.TargetMessage) {
436436
}
437437
}
438438

439-
func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) error {
439+
func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) {
440440
req := msg.Message[0].(*heartbeatpb.HeartBeatRequest)
441441
if req.Watermark != nil {
442442
m.checkpointTsByCapture[msg.From] = *req.Watermark
@@ -452,7 +452,6 @@ func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) error {
452452
m.runningErrors[msg.From] = req.Err
453453
m.errLock.Unlock()
454454
}
455-
return nil
456455
}
457456

458457
func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
@@ -639,9 +638,9 @@ func (m *Maintainer) onPeriodTask() {
639638
func (m *Maintainer) collectMetrics() {
640639
if time.Since(m.lastPrintStatusTime) > time.Second*20 {
641640
total := m.controller.TaskSize()
642-
scheduling := m.controller.db.GetAbsentSize()
643-
working := m.controller.db.GetReplicatingSize()
644-
absent := m.controller.db.GetAbsentSize()
641+
scheduling := m.controller.replicationDB.GetAbsentSize()
642+
working := m.controller.replicationDB.GetReplicatingSize()
643+
absent := m.controller.replicationDB.GetAbsentSize()
645644

646645
m.tableCountGauge.Set(float64(total))
647646
m.scheduledTaskGauge.Set(float64(scheduling))

0 commit comments

Comments
 (0)