Skip to content

Commit 62962fa

Browse files
authored
fix barrier block logic (#358)
* fix barrier block logic * fix barrier block logic
1 parent d3c9ba0 commit 62962fa

File tree

3 files changed

+68
-33
lines changed

3 files changed

+68
-33
lines changed

maintainer/barrier.go

+5-22
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import (
2323
// Barrier manage the block events for the changefeed
2424
// the block event processing logic:
2525
// 1. dispatcher report an event to maintainer, like ddl, sync point
26-
// 2. maintainer wait for all dispatchers to reach the same commit ts (all dispatchers will report the same event)
26+
// 2. maintainer wait for all dispatchers reporting block event (all dispatchers will report the same event)
2727
// 3. maintainer choose one dispatcher to write(tack an action) the event to downstream, (resend logic is needed)
28-
// 4. maintainer wait for the selected dispatcher advance its checkpoint ts,(means it already finished the write action), (resend logic is needed)
28+
// 4. maintainer wait for the selected dispatcher advance its checkpoint ts, checkpoint ts >= block ts,(means it already finished the write action), (resend logic is needed)
2929
// 5. maintainer send pass action to all other dispatchers. (resend logic is needed)
3030
// 6. maintainer wait for all dispatchers advance checkpoints, and cleanup memory
3131
type Barrier struct {
@@ -90,25 +90,10 @@ func (b *Barrier) handleNoStateHeartbeat(dispatcherID common.DispatcherID, check
9090
if !ok {
9191
return nil
9292
}
93-
// no block event send ,but reached the block point
94-
if checkpointTs == event.commitTs {
95-
action := event.dispatcherReachedBlockTs(dispatcherID)
96-
// all dispatcher reported heartbeat, select one to write
97-
if action != nil {
98-
dispatcherStatus := &heartbeatpb.DispatcherStatus{
99-
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
100-
InfluenceType: heartbeatpb.InfluenceType_Normal,
101-
DispatcherIDs: []*heartbeatpb.DispatcherID{event.writerDispatcher.ToPB()},
102-
},
103-
Action: action,
104-
}
105-
return dispatcherStatus
106-
}
107-
}
10893

10994
// there is a block event and the dispatcher advanced its checkpoint ts
11095
// which means we have sent pass or write action to it
111-
if checkpointTs > event.commitTs {
96+
if checkpointTs >= event.commitTs {
11297
// the writer already synced ddl to downstream
11398
if event.writerDispatcher == dispatcherID {
11499
// schedule new and removed tasks
@@ -143,10 +128,8 @@ func (b *Barrier) handleStateHeartbeat(changefeedID string,
143128
if blockState.IsBlocked {
144129
// insert an event, or get the old one event check if the event is already tracked
145130
event := b.getOrInsertNewEvent(changefeedID, dispatcherID, blockState)
146-
// the dispatcher already reached the block event block ts, check whether we need to send write action
147-
if status.CheckpointTs == blockState.BlockTs {
148-
dispatcherStatus.Action = event.dispatcherReachedBlockTs(dispatcherID)
149-
}
131+
// check if all dispatchers already reported the block event, and check whether we need to send write action
132+
dispatcherStatus.Action = event.dispatcherReachedBlockTs(dispatcherID)
150133
} else {
151134
// it's not a blocked event, it must be sent by table event trigger dispatcher
152135
// the ddl already synced to downstream , e.g.: create table, drop table

maintainer/barrier_event.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type BarrierEvent struct {
4343
newTables []*heartbeatpb.Table
4444
schemaIDChange []*heartbeatpb.SchemaIDChange
4545

46-
advancedDispatchers map[common.DispatcherID]bool
46+
reportedDispatchers map[common.DispatcherID]bool
4747
lastResendTime time.Time
4848
}
4949

@@ -58,7 +58,7 @@ func NewBlockEvent(cfID string, scheduler *Controller,
5858
newTables: status.NeedAddedTables,
5959
dropDispatchers: status.NeedDroppedTables,
6060
schemaIDChange: status.UpdatedSchemas,
61-
advancedDispatchers: make(map[common.DispatcherID]bool),
61+
reportedDispatchers: make(map[common.DispatcherID]bool),
6262
lastResendTime: time.Time{},
6363
}
6464
if event.blockedDispatchers != nil && event.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_Normal {
@@ -122,12 +122,12 @@ func (be *BarrierEvent) allDispatcherReported() bool {
122122
// todo: we should check ddl dispatcher checkpoint ts here, because the size may affect by ddls?
123123
switch be.blockedDispatchers.InfluenceType {
124124
case heartbeatpb.InfluenceType_DB:
125-
return len(be.advancedDispatchers) >=
125+
return len(be.reportedDispatchers) >=
126126
len(be.scheduler.GetTasksBySchemaID(be.blockedDispatchers.SchemaID))
127127
case heartbeatpb.InfluenceType_All:
128-
return len(be.advancedDispatchers) >= be.scheduler.TaskSize()
128+
return len(be.reportedDispatchers) >= be.scheduler.TaskSize()
129129
case heartbeatpb.InfluenceType_Normal:
130-
return len(be.advancedDispatchers) >= len(be.blockedTasks)
130+
return len(be.reportedDispatchers) >= len(be.blockedTasks)
131131
}
132132
return false
133133
}
@@ -179,18 +179,19 @@ func (be *BarrierEvent) sendPassAction() []*messaging.TargetMessage {
179179
return msgs
180180
}
181181

182-
// dispatcherReachedBlockTs check if all the dispatchers reached the block ts
182+
// dispatcherReachedBlockTs check if all the dispatchers reported the block events,
183+
// if so, select one dispatcher to write, currently choose the last one
183184
func (be *BarrierEvent) dispatcherReachedBlockTs(dispatcherID common.DispatcherID) *heartbeatpb.DispatcherAction {
184185
if be.selected {
185186
return nil
186187
}
187-
be.advancedDispatchers[dispatcherID] = true
188+
be.reportedDispatchers[dispatcherID] = true
188189
// all dispatcher reported heartbeat, select the last one to write
189190
if be.allDispatcherReported() {
190191
be.writerDispatcher = dispatcherID
191192
be.selected = true
192193
// release memory
193-
be.advancedDispatchers = nil
194+
be.reportedDispatchers = nil
194195

195196
log.Info("all dispatcher reported heartbeat, select one to write",
196197
zap.String("changefeed", be.cfID),
@@ -214,7 +215,7 @@ func (be *BarrierEvent) resend() []*messaging.TargetMessage {
214215
return nil
215216
}
216217
be.lastResendTime = time.Now()
217-
// we select a dispatcher as the writer, still waiting for that dispatcher advance it's checkpoint ts
218+
// we select a dispatcher as the writer, still waiting for that dispatcher advance its checkpoint ts
218219
if !be.writerDispatcherAdvanced {
219220
//resend write action
220221
stm := be.scheduler.GetTask(be.writerDispatcher)

maintainer/barrier_test.go

+53-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,57 @@ import (
2626
"go.uber.org/zap"
2727
)
2828

29+
func TestOneBlockEvent(t *testing.T) {
30+
sche := NewController("test", 1, nil, nil, nil, 1000, 0)
31+
sche.AddNewNode("node1")
32+
sche.AddNewTable(common.Table{1, 1}, 0)
33+
stm := sche.GetTasksByTableIDs(1)[0]
34+
stm.Primary = "node1"
35+
stm.State = scheduler.SchedulerStatusWorking
36+
sche.tryMoveTask(stm.ID, stm, scheduler.SchedulerStatusAbsent, "", true)
37+
barrier := NewBarrier(sche)
38+
msg := barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{
39+
ChangefeedID: "test",
40+
Statuses: []*heartbeatpb.TableSpanStatus{
41+
{
42+
ID: stm.ID.ToPB(),
43+
State: &heartbeatpb.State{
44+
IsBlocked: true,
45+
BlockTs: 10,
46+
BlockTables: &heartbeatpb.InfluencedTables{
47+
InfluenceType: heartbeatpb.InfluenceType_All,
48+
},
49+
},
50+
CheckpointTs: 0,
51+
},
52+
},
53+
})
54+
require.NotNil(t, msg)
55+
resp := msg.Message[0].(*heartbeatpb.HeartBeatResponse)
56+
require.Equal(t, barrier.blockedDispatcher[stm.ID], barrier.blockedTs[10])
57+
event := barrier.blockedTs[10]
58+
require.Equal(t, uint64(10), event.commitTs)
59+
require.True(t, event.writerDispatcher == stm.ID)
60+
require.True(t, event.selected)
61+
require.False(t, event.writerDispatcherAdvanced)
62+
require.Nil(t, event.reportedDispatchers)
63+
require.Equal(t, resp.DispatcherStatuses[0].Ack.CommitTs, uint64(10))
64+
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
65+
require.Equal(t, resp.DispatcherStatuses[0].Action.Action, heartbeatpb.Action_Write)
66+
67+
msg = barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{
68+
ChangefeedID: "test",
69+
Statuses: []*heartbeatpb.TableSpanStatus{
70+
{
71+
ID: stm.ID.ToPB(),
72+
CheckpointTs: 10,
73+
},
74+
},
75+
})
76+
require.Len(t, barrier.blockedTs, 0)
77+
require.Len(t, barrier.blockedDispatcher, 0)
78+
}
79+
2980
func TestNormalBlock(t *testing.T) {
3081
sche := NewController("test", 1, nil, nil, nil, 1000, 0)
3182
sche.AddNewNode("node1")
@@ -121,7 +172,7 @@ func TestNormalBlock(t *testing.T) {
121172
event := barrier.blockedTs[10]
122173
require.Equal(t, uint64(10), event.commitTs)
123174
require.True(t, event.writerDispatcher == selectDispatcherID)
124-
require.Nil(t, event.advancedDispatchers)
175+
require.Nil(t, event.reportedDispatchers)
125176

126177
// repeated status
127178
barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{
@@ -165,7 +216,7 @@ func TestNormalBlock(t *testing.T) {
165216
})
166217
require.Equal(t, uint64(10), event.commitTs)
167218
require.True(t, event.writerDispatcher == selectDispatcherID)
168-
require.Nil(t, event.advancedDispatchers)
219+
require.Nil(t, event.reportedDispatchers)
169220

170221
// selected node write done
171222
msg = barrier.HandleStatus("node2", &heartbeatpb.HeartBeatRequest{

0 commit comments

Comments
 (0)