Skip to content

Commit 4f149bb

Browse files
authored
fix IsSycPoint is not set when resend write action (#361)
1 parent b1d8313 commit 4f149bb

File tree

4 files changed

+120
-108
lines changed

4 files changed

+120
-108
lines changed

maintainer/barrier.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (b *Barrier) handleBlockState(changefeedID string,
127127
dispatcherID.ToPB(),
128128
},
129129
},
130-
Ack: &heartbeatpb.ACK{CommitTs: blockState.BlockTs, IsSyncPoint: blockState.IsSyncPoint},
130+
Ack: ackEvent(blockState.BlockTs, blockState.IsSyncPoint),
131131
}
132132
if blockState.IsBlocked {
133133
key := getEventKey(blockState)
@@ -155,6 +155,14 @@ func (b *Barrier) getOrInsertNewEvent(changefeedID string, key eventKey,
155155
return event
156156
}
157157

158+
// ackEvent creates an ack event
159+
func ackEvent(commitTs uint64, isSyncPoint bool) *heartbeatpb.ACK {
160+
return &heartbeatpb.ACK{
161+
CommitTs: commitTs,
162+
IsSyncPoint: isSyncPoint,
163+
}
164+
}
165+
158166
// getEventKey returns the key of the block event
159167
func getEventKey(blockState *heartbeatpb.State) eventKey {
160168
return eventKey{

maintainer/barrier_event.go

+27-30
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
type BarrierEvent struct {
3232
cfID string
3333
commitTs uint64
34-
scheduler *Controller
34+
controller *Controller
3535
selected bool
3636
writerDispatcher common.DispatcherID
3737
writerDispatcherAdvanced bool
@@ -49,10 +49,10 @@ type BarrierEvent struct {
4949
lastResendTime time.Time
5050
}
5151

52-
func NewBlockEvent(cfID string, scheduler *Controller,
52+
func NewBlockEvent(cfID string, controller *Controller,
5353
status *heartbeatpb.State) *BarrierEvent {
5454
event := &BarrierEvent{
55-
scheduler: scheduler,
55+
controller: controller,
5656
selected: false,
5757
cfID: cfID,
5858
commitTs: status.BlockTs,
@@ -65,10 +65,10 @@ func NewBlockEvent(cfID string, scheduler *Controller,
6565
isSyncPoint: status.IsSyncPoint,
6666
}
6767
if event.blockedDispatchers != nil && event.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_Normal {
68-
event.blockedTasks = event.scheduler.GetTasksByTableIDs(event.blockedDispatchers.TableIDs...)
68+
event.blockedTasks = event.controller.GetTasksByTableIDs(event.blockedDispatchers.TableIDs...)
6969
}
7070
if event.dropDispatchers != nil && event.dropDispatchers.InfluenceType == heartbeatpb.InfluenceType_Normal {
71-
event.dropTasks = event.scheduler.GetTasksByTableIDs(event.dropDispatchers.TableIDs...)
71+
event.dropTasks = event.controller.GetTasksByTableIDs(event.dropDispatchers.TableIDs...)
7272
}
7373
return event
7474
}
@@ -78,30 +78,30 @@ func (be *BarrierEvent) scheduleBlockEvent() {
7878
if be.dropDispatchers != nil {
7979
switch be.dropDispatchers.InfluenceType {
8080
case heartbeatpb.InfluenceType_DB:
81-
for _, stm := range be.scheduler.GetTasksBySchemaID(be.dropDispatchers.SchemaID) {
81+
for _, stm := range be.controller.GetTasksBySchemaID(be.dropDispatchers.SchemaID) {
8282
log.Info(" remove table",
8383
zap.String("changefeed", be.cfID),
8484
zap.String("table", stm.ID.String()))
85-
be.scheduler.RemoveTask(stm)
85+
be.controller.RemoveTask(stm)
8686
}
8787
case heartbeatpb.InfluenceType_Normal:
8888
for _, stm := range be.dropTasks {
8989
log.Info(" remove table",
9090
zap.String("changefeed", be.cfID),
9191
zap.String("table", stm.ID.String()))
92-
be.scheduler.RemoveTask(stm)
92+
be.controller.RemoveTask(stm)
9393
}
9494
case heartbeatpb.InfluenceType_All:
9595
log.Info("remove all tables by barrier", zap.String("changefeed", be.cfID))
96-
be.scheduler.RemoveAllTasks()
96+
be.controller.RemoveAllTasks()
9797
}
9898
}
9999
for _, add := range be.newTables {
100100
log.Info(" add new table",
101101
zap.String("changefeed", be.cfID),
102102
zap.Int64("schema", add.SchemaID),
103103
zap.Int64("table", add.TableID))
104-
be.scheduler.AddNewTable(commonEvent.Table{
104+
be.controller.AddNewTable(commonEvent.Table{
105105
SchemaID: add.SchemaID,
106106
TableID: add.TableID,
107107
}, be.commitTs)
@@ -113,7 +113,7 @@ func (be *BarrierEvent) scheduleBlockEvent() {
113113
zap.Int64("newSchema", change.OldSchemaID),
114114
zap.Int64("oldSchema", change.NewSchemaID),
115115
zap.Int64("table", change.TableID))
116-
be.scheduler.UpdateSchemaID(change.TableID, change.NewSchemaID)
116+
be.controller.UpdateSchemaID(change.TableID, change.NewSchemaID)
117117
}
118118
}
119119

@@ -134,9 +134,9 @@ func (be *BarrierEvent) allDispatcherReported() bool {
134134
switch be.blockedDispatchers.InfluenceType {
135135
case heartbeatpb.InfluenceType_DB:
136136
return len(be.reportedDispatchers) >=
137-
len(be.scheduler.GetTasksBySchemaID(be.blockedDispatchers.SchemaID))
137+
len(be.controller.GetTasksBySchemaID(be.blockedDispatchers.SchemaID))
138138
case heartbeatpb.InfluenceType_All:
139-
return len(be.reportedDispatchers) >= be.scheduler.TaskSize()
139+
return len(be.reportedDispatchers) >= be.controller.TaskSize()
140140
case heartbeatpb.InfluenceType_Normal:
141141
return len(be.reportedDispatchers) >= len(be.blockedTasks)
142142
}
@@ -150,7 +150,7 @@ func (be *BarrierEvent) sendPassAction() []*messaging.TargetMessage {
150150
msgMap := make(map[node.ID]*messaging.TargetMessage)
151151
switch be.blockedDispatchers.InfluenceType {
152152
case heartbeatpb.InfluenceType_DB:
153-
for _, stm := range be.scheduler.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) {
153+
for _, stm := range be.controller.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) {
154154
if stm.Primary == "" {
155155
continue
156156
}
@@ -160,7 +160,7 @@ func (be *BarrierEvent) sendPassAction() []*messaging.TargetMessage {
160160
}
161161
}
162162
case heartbeatpb.InfluenceType_All:
163-
for _, n := range be.scheduler.GetAllNodes() {
163+
for _, n := range be.controller.GetAllNodes() {
164164
msgMap[n] = be.newPassActionMessage(n)
165165
}
166166
case heartbeatpb.InfluenceType_Normal:
@@ -207,11 +207,7 @@ func (be *BarrierEvent) dispatcherReachedBlockTs(dispatcherID common.DispatcherI
207207
zap.String("dispatcher", dispatcherID.String()),
208208
zap.Uint64("commitTs", be.commitTs),
209209
zap.String("barrierType", be.blockedDispatchers.InfluenceType.String()))
210-
return &heartbeatpb.DispatcherAction{
211-
Action: heartbeatpb.Action_Write,
212-
CommitTs: be.commitTs,
213-
IsSyncPoint: be.isSyncPoint,
214-
}
210+
return be.action(heartbeatpb.Action_Write)
215211
}
216212
return nil
217213
}
@@ -228,7 +224,7 @@ func (be *BarrierEvent) resend() []*messaging.TargetMessage {
228224
// we select a dispatcher as the writer, still waiting for that dispatcher advance its checkpoint ts
229225
if !be.writerDispatcherAdvanced {
230226
//resend write action
231-
stm := be.scheduler.GetTask(be.writerDispatcher)
227+
stm := be.controller.GetTask(be.writerDispatcher)
232228
if stm == nil || stm.Primary == "" {
233229
return nil
234230
}
@@ -244,10 +240,7 @@ func (be *BarrierEvent) newWriterActionMessage(capture node.ID) *messaging.Targe
244240
ChangefeedID: be.cfID,
245241
DispatcherStatuses: []*heartbeatpb.DispatcherStatus{
246242
{
247-
Action: &heartbeatpb.DispatcherAction{
248-
Action: heartbeatpb.Action_Write,
249-
CommitTs: be.commitTs,
250-
},
243+
Action: be.action(heartbeatpb.Action_Write),
251244
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
252245
InfluenceType: heartbeatpb.InfluenceType_Normal,
253246
DispatcherIDs: []*heartbeatpb.DispatcherID{
@@ -264,11 +257,7 @@ func (be *BarrierEvent) newPassActionMessage(capture node.ID) *messaging.TargetM
264257
ChangefeedID: be.cfID,
265258
DispatcherStatuses: []*heartbeatpb.DispatcherStatus{
266259
{
267-
Action: &heartbeatpb.DispatcherAction{
268-
Action: heartbeatpb.Action_Pass,
269-
CommitTs: be.commitTs,
270-
IsSyncPoint: be.isSyncPoint,
271-
},
260+
Action: be.action(heartbeatpb.Action_Pass),
272261
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
273262
InfluenceType: be.blockedDispatchers.InfluenceType,
274263
SchemaID: be.blockedDispatchers.SchemaID,
@@ -277,3 +266,11 @@ func (be *BarrierEvent) newPassActionMessage(capture node.ID) *messaging.TargetM
277266
},
278267
}})
279268
}
269+
270+
func (be *BarrierEvent) action(action heartbeatpb.Action) *heartbeatpb.DispatcherAction {
271+
return &heartbeatpb.DispatcherAction{
272+
Action: action,
273+
CommitTs: be.commitTs,
274+
IsSyncPoint: be.isSyncPoint,
275+
}
276+
}

maintainer/barrier_event_test.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import (
2525
)
2626

2727
func TestScheduleEvent(t *testing.T) {
28-
sche := NewController("test", 1, nil, nil, nil, 1000, 0)
29-
sche.AddNewTable(commonEvent.Table{1, 1}, 1)
30-
event := NewBlockEvent("test", sche, &heartbeatpb.State{
28+
controller := NewController("test", 1, nil, nil, nil, 1000, 0)
29+
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
30+
event := NewBlockEvent("test", controller, &heartbeatpb.State{
3131
IsBlocked: true,
3232
BlockTs: 10,
3333
NeedDroppedTables: &heartbeatpb.InfluencedTables{
@@ -38,9 +38,9 @@ func TestScheduleEvent(t *testing.T) {
3838
})
3939
event.scheduleBlockEvent()
4040
//drop table will be executed first
41-
require.Len(t, sche.Absent(), 2)
41+
require.Len(t, controller.Absent(), 2)
4242

43-
event = NewBlockEvent("test", sche, &heartbeatpb.State{
43+
event = NewBlockEvent("test", controller, &heartbeatpb.State{
4444
IsBlocked: true,
4545
BlockTs: 10,
4646
NeedDroppedTables: &heartbeatpb.InfluencedTables{
@@ -51,9 +51,9 @@ func TestScheduleEvent(t *testing.T) {
5151
})
5252
event.scheduleBlockEvent()
5353
//drop table will be executed first, then add the new table
54-
require.Len(t, sche.Absent(), 1)
54+
require.Len(t, controller.Absent(), 1)
5555

56-
event = NewBlockEvent("test", sche, &heartbeatpb.State{
56+
event = NewBlockEvent("test", controller, &heartbeatpb.State{
5757
IsBlocked: true,
5858
BlockTs: 10,
5959
NeedDroppedTables: &heartbeatpb.InfluencedTables{
@@ -64,22 +64,22 @@ func TestScheduleEvent(t *testing.T) {
6464
})
6565
event.scheduleBlockEvent()
6666
//drop table will be executed first, then add the new table
67-
require.Len(t, sche.Absent(), 1)
67+
require.Len(t, controller.Absent(), 1)
6868
}
6969

7070
func TestResendAction(t *testing.T) {
71-
sche := NewController("test", 1, nil, nil, nil, 1000, 0)
72-
sche.AddNewNode("node1")
73-
sche.AddNewTable(commonEvent.Table{1, 1}, 1)
74-
sche.AddNewTable(commonEvent.Table{1, 2}, 1)
71+
controller := NewController("test", 1, nil, nil, nil, 1000, 0)
72+
controller.AddNewNode("node1")
73+
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
74+
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1)
7575
var dispatcherIDs []common.DispatcherID
76-
for key, stm := range sche.Absent() {
76+
for key, stm := range controller.Absent() {
7777
stm.Primary = "node1"
7878
stm.State = scheduler.SchedulerStatusWorking
79-
sche.tryMoveTask(key, stm, scheduler.SchedulerStatusAbsent, "", true)
79+
controller.tryMoveTask(key, stm, scheduler.SchedulerStatusAbsent, "", true)
8080
dispatcherIDs = append(dispatcherIDs, key)
8181
}
82-
event := NewBlockEvent("test", sche, &heartbeatpb.State{
82+
event := NewBlockEvent("test", controller, &heartbeatpb.State{
8383
IsBlocked: true,
8484
BlockTs: 10,
8585
BlockTables: &heartbeatpb.InfluencedTables{
@@ -105,7 +105,7 @@ func TestResendAction(t *testing.T) {
105105
msgs = event.resend()
106106
require.Len(t, msgs, 1)
107107

108-
event = NewBlockEvent("test", sche, &heartbeatpb.State{
108+
event = NewBlockEvent("test", controller, &heartbeatpb.State{
109109
IsBlocked: true,
110110
BlockTs: 10,
111111
BlockTables: &heartbeatpb.InfluencedTables{
@@ -123,7 +123,7 @@ func TestResendAction(t *testing.T) {
123123
require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType, heartbeatpb.InfluenceType_DB)
124124
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
125125

126-
event = NewBlockEvent("test", sche, &heartbeatpb.State{
126+
event = NewBlockEvent("test", controller, &heartbeatpb.State{
127127
IsBlocked: true,
128128
BlockTs: 10,
129129
BlockTables: &heartbeatpb.InfluencedTables{
@@ -141,7 +141,7 @@ func TestResendAction(t *testing.T) {
141141
require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType, heartbeatpb.InfluenceType_All)
142142
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
143143

144-
event = NewBlockEvent("test", sche, &heartbeatpb.State{
144+
event = NewBlockEvent("test", controller, &heartbeatpb.State{
145145
IsBlocked: true,
146146
BlockTs: 10,
147147
BlockTables: &heartbeatpb.InfluencedTables{
@@ -163,12 +163,12 @@ func TestResendAction(t *testing.T) {
163163
}
164164

165165
func TestUpdateSchemaID(t *testing.T) {
166-
sche := NewController("test", 1, nil, nil, nil, 1000, 0)
167-
sche.AddNewNode("node1")
168-
sche.AddNewTable(commonEvent.Table{1, 1}, 1)
169-
require.Len(t, sche.Absent(), 1)
170-
require.Len(t, sche.GetTasksBySchemaID(1), 1)
171-
event := NewBlockEvent("test", sche, &heartbeatpb.State{
166+
controller := NewController("test", 1, nil, nil, nil, 1000, 0)
167+
controller.AddNewNode("node1")
168+
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
169+
require.Len(t, controller.Absent(), 1)
170+
require.Len(t, controller.GetTasksBySchemaID(1), 1)
171+
event := NewBlockEvent("test", controller, &heartbeatpb.State{
172172
IsBlocked: true,
173173
BlockTs: 10,
174174
BlockTables: &heartbeatpb.InfluencedTables{
@@ -183,9 +183,9 @@ func TestUpdateSchemaID(t *testing.T) {
183183
}},
184184
)
185185
event.scheduleBlockEvent()
186-
require.Len(t, sche.Absent(), 1)
186+
require.Len(t, controller.Absent(), 1)
187187
// check the schema id and map is updated
188-
require.Len(t, sche.GetTasksBySchemaID(1), 0)
189-
require.Len(t, sche.GetTasksBySchemaID(2), 1)
190-
require.Equal(t, sche.GetTasksByTableIDs(1)[0].Inferior.(*ReplicaSet).SchemaID, int64(2))
188+
require.Len(t, controller.GetTasksBySchemaID(1), 0)
189+
require.Len(t, controller.GetTasksBySchemaID(2), 1)
190+
require.Equal(t, controller.GetTasksByTableIDs(1)[0].Inferior.(*ReplicaSet).SchemaID, int64(2))
191191
}

0 commit comments

Comments
 (0)