Skip to content

Commit b1d8313

Browse files
authored
add IsSycPoint as a part of barrier event key (#360)
1 parent e125c4c commit b1d8313

File tree

3 files changed

+78
-54
lines changed

3 files changed

+78
-54
lines changed

maintainer/barrier.go

+39-28
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,29 @@ import (
2323
// Barrier manage the block events for the changefeed
2424
// the block event processing logic:
2525
// 1. dispatcher report an event to maintainer, like ddl, sync point
26-
// 2. maintainer wait for all dispatchers reporting block event (all dispatchers will report the same event)
26+
// 2. maintainer wait for all dispatchers reporting block event (all dispatchers must report the same event)
2727
// 3. maintainer choose one dispatcher to write(tack an action) the event to downstream, (resend logic is needed)
28-
// 4. maintainer wait for the selected dispatcher advance its checkpoint ts, checkpoint ts >= block ts,(means it already finished the write action), (resend logic is needed)
28+
// 4. maintainer wait for the selected dispatcher reporting event(write) done message (resend logic is needed)
2929
// 5. maintainer send pass action to all other dispatchers. (resend logic is needed)
30-
// 6. maintainer wait for all dispatchers advance checkpoints, and cleanup memory
30+
// 6. maintainer wait for all dispatchers reporting event(pass) done message
31+
// 7. maintainer clear the event
3132
type Barrier struct {
32-
blockedTs map[uint64]*BarrierEvent
33+
blockedTs map[eventKey]*BarrierEvent
3334
controller *Controller
34-
// if maintainer is down, the barrier will be re-built, so we can use the dispatcher as the key
35-
blockedDispatcher map[common.DispatcherID]*BarrierEvent
35+
}
36+
37+
// eventKey is the key of the block event,
38+
// the ddl and sync point are identified by the blockTs and isSyncPoint since they can share the same blockTs
39+
type eventKey struct {
40+
blockTs uint64
41+
isSyncPoint bool
3642
}
3743

3844
// NewBarrier create a new barrier for the changefeed
3945
func NewBarrier(controller *Controller) *Barrier {
4046
return &Barrier{
41-
blockedTs: make(map[uint64]*BarrierEvent),
42-
blockedDispatcher: make(map[common.DispatcherID]*BarrierEvent),
43-
controller: controller,
47+
blockedTs: make(map[eventKey]*BarrierEvent),
48+
controller: controller,
4449
}
4550
}
4651

@@ -79,14 +84,15 @@ func (b *Barrier) Resend() []*messaging.TargetMessage {
7984
func (b *Barrier) handleOneStatus(changefeedID string, status *heartbeatpb.TableSpanBlockStatus) *heartbeatpb.DispatcherStatus {
8085
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
8186
if status.State.EventDone {
82-
b.handleEventDone(dispatcherID)
87+
b.handleEventDone(dispatcherID, status)
8388
return nil
8489
}
8590
return b.handleBlockState(changefeedID, dispatcherID, status)
8691
}
8792

88-
func (b *Barrier) handleEventDone(dispatcherID common.DispatcherID) {
89-
event, ok := b.blockedDispatcher[dispatcherID]
93+
func (b *Barrier) handleEventDone(dispatcherID common.DispatcherID, status *heartbeatpb.TableSpanBlockStatus) {
94+
key := getEventKey(status.State)
95+
event, ok := b.blockedTs[key]
9096
// no block event found
9197
if !ok {
9298
return
@@ -103,10 +109,10 @@ func (b *Barrier) handleEventDone(dispatcherID common.DispatcherID) {
103109
}
104110

105111
// checkpoint ts is advanced, clear the map, so do not need to resend message anymore
106-
delete(b.blockedDispatcher, dispatcherID)
107-
// all blocked dispatchers are advanced checkpoint ts
108-
if len(b.blockedDispatcher) == 0 {
109-
delete(b.blockedTs, event.commitTs)
112+
event.markDispatcherEventDone(dispatcherID)
113+
// all blocked dispatchers are reported event done, we can clean up the event
114+
if event.allDispatcherDone() {
115+
delete(b.blockedTs, key)
110116
}
111117
}
112118

@@ -121,33 +127,38 @@ func (b *Barrier) handleBlockState(changefeedID string,
121127
dispatcherID.ToPB(),
122128
},
123129
},
124-
Ack: &heartbeatpb.ACK{CommitTs: blockState.BlockTs},
130+
Ack: &heartbeatpb.ACK{CommitTs: blockState.BlockTs, IsSyncPoint: blockState.IsSyncPoint},
125131
}
126132
if blockState.IsBlocked {
133+
key := getEventKey(blockState)
127134
// insert an event, or get the old one event check if the event is already tracked
128-
event := b.getOrInsertNewEvent(changefeedID, dispatcherID, blockState)
135+
event := b.getOrInsertNewEvent(changefeedID, key, blockState)
129136
// check if all dispatchers already reported the block event, and check whether we need to send write action
130137
dispatcherStatus.Action = event.dispatcherReachedBlockTs(dispatcherID)
131138
} else {
132139
// it's not a blocked event, it must be sent by table event trigger dispatcher
133-
// the ddl already synced to downstream , e.g.: create table, drop table
134-
// if ack failed, dispatcher will send a heartbeat again
140+
// and the ddl already synced to downstream , e.g.: create table, drop table
141+
// if ack failed, dispatcher will send a heartbeat again, so we do not need to care about resend message here
135142
NewBlockEvent(changefeedID, b.controller, blockState).scheduleBlockEvent()
136143
}
137144
return dispatcherStatus
138145
}
139146

140-
func (b *Barrier) getOrInsertNewEvent(changefeedID string,
141-
dispatcherID common.DispatcherID,
147+
// getOrInsertNewEvent get the block event from the map, if not found, create a new one
148+
func (b *Barrier) getOrInsertNewEvent(changefeedID string, key eventKey,
142149
blockState *heartbeatpb.State) *BarrierEvent {
143-
event, ok := b.blockedTs[blockState.BlockTs]
150+
event, ok := b.blockedTs[key]
144151
if !ok {
145152
event = NewBlockEvent(changefeedID, b.controller, blockState)
146-
b.blockedTs[blockState.BlockTs] = event
147-
}
148-
_, ok = b.blockedDispatcher[dispatcherID]
149-
if !ok {
150-
b.blockedDispatcher[dispatcherID] = event
153+
b.blockedTs[key] = event
151154
}
152155
return event
153156
}
157+
158+
// getEventKey returns the key of the block event
159+
func getEventKey(blockState *heartbeatpb.State) eventKey {
160+
return eventKey{
161+
blockTs: blockState.BlockTs,
162+
isSyncPoint: blockState.IsSyncPoint,
163+
}
164+
}

maintainer/barrier_event.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type BarrierEvent struct {
4545
schemaIDChange []*heartbeatpb.SchemaIDChange
4646
isSyncPoint bool
4747

48-
reportedDispatchers map[common.DispatcherID]bool
48+
reportedDispatchers map[common.DispatcherID]struct{}
4949
lastResendTime time.Time
5050
}
5151

@@ -60,7 +60,7 @@ func NewBlockEvent(cfID string, scheduler *Controller,
6060
newTables: status.NeedAddedTables,
6161
dropDispatchers: status.NeedDroppedTables,
6262
schemaIDChange: status.UpdatedSchemas,
63-
reportedDispatchers: make(map[common.DispatcherID]bool),
63+
reportedDispatchers: make(map[common.DispatcherID]struct{}),
6464
lastResendTime: time.Time{},
6565
isSyncPoint: status.IsSyncPoint,
6666
}
@@ -117,6 +117,14 @@ func (be *BarrierEvent) scheduleBlockEvent() {
117117
}
118118
}
119119

120+
func (be *BarrierEvent) markDispatcherEventDone(id common.DispatcherID) {
121+
delete(be.reportedDispatchers, id)
122+
}
123+
124+
func (be *BarrierEvent) allDispatcherDone() bool {
125+
return len(be.reportedDispatchers) == 0
126+
}
127+
120128
func (be *BarrierEvent) allDispatcherReported() bool {
121129
if be.blockedDispatchers == nil {
122130
return true
@@ -188,13 +196,11 @@ func (be *BarrierEvent) dispatcherReachedBlockTs(dispatcherID common.DispatcherI
188196
if be.selected {
189197
return nil
190198
}
191-
be.reportedDispatchers[dispatcherID] = true
199+
be.reportedDispatchers[dispatcherID] = struct{}{}
192200
// all dispatcher reported heartbeat, select the last one to write
193201
if be.allDispatcherReported() {
194202
be.writerDispatcher = dispatcherID
195203
be.selected = true
196-
// release memory
197-
be.reportedDispatchers = nil
198204

199205
log.Info("all dispatcher reported heartbeat, select one to write",
200206
zap.String("changefeed", be.cfID),

maintainer/barrier_test.go

+28-21
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,17 @@ func TestOneBlockEvent(t *testing.T) {
5353
},
5454
})
5555
require.NotNil(t, msg)
56+
key := eventKey{
57+
blockTs: 10,
58+
isSyncPoint: true,
59+
}
5660
resp := msg.Message[0].(*heartbeatpb.HeartBeatResponse)
57-
require.Equal(t, barrier.blockedDispatcher[stm.ID], barrier.blockedTs[10])
58-
event := barrier.blockedTs[10]
61+
event := barrier.blockedTs[key]
5962
require.Equal(t, uint64(10), event.commitTs)
6063
require.True(t, event.writerDispatcher == stm.ID)
6164
require.True(t, event.selected)
6265
require.False(t, event.writerDispatcherAdvanced)
63-
require.Nil(t, event.reportedDispatchers)
66+
require.Len(t, event.reportedDispatchers, 1)
6467
require.Equal(t, resp.DispatcherStatuses[0].Ack.CommitTs, uint64(10))
6568
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
6669
require.Equal(t, resp.DispatcherStatuses[0].Action.Action, heartbeatpb.Action_Write)
@@ -72,6 +75,7 @@ func TestOneBlockEvent(t *testing.T) {
7275
{
7376
ID: stm.ID.ToPB(),
7477
State: &heartbeatpb.State{
78+
BlockTs: 10,
7579
IsBlocked: true,
7680
EventDone: true,
7781
IsSyncPoint: true,
@@ -80,7 +84,6 @@ func TestOneBlockEvent(t *testing.T) {
8084
},
8185
})
8286
require.Len(t, barrier.blockedTs, 0)
83-
require.Len(t, barrier.blockedDispatcher, 0)
8487
}
8588

8689
func TestNormalBlock(t *testing.T) {
@@ -171,11 +174,14 @@ func TestNormalBlock(t *testing.T) {
171174
},
172175
})
173176
require.NotNil(t, msg)
174-
require.Equal(t, barrier.blockedDispatcher[selectDispatcherID], barrier.blockedTs[10])
175-
event := barrier.blockedTs[10]
177+
key := eventKey{
178+
blockTs: 10,
179+
isSyncPoint: false,
180+
}
181+
event := barrier.blockedTs[key]
176182
require.Equal(t, uint64(10), event.commitTs)
177183
require.True(t, event.writerDispatcher == selectDispatcherID)
178-
require.Nil(t, event.reportedDispatchers)
184+
require.Len(t, event.reportedDispatchers, 3)
179185

180186
// repeated status
181187
barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{
@@ -217,7 +223,7 @@ func TestNormalBlock(t *testing.T) {
217223
})
218224
require.Equal(t, uint64(10), event.commitTs)
219225
require.True(t, event.writerDispatcher == selectDispatcherID)
220-
require.Nil(t, event.reportedDispatchers)
226+
require.Len(t, event.reportedDispatchers, 3)
221227

222228
// selected node write done
223229
msg = barrier.HandleStatus("node2", &heartbeatpb.BlockStatusRequest{
@@ -234,7 +240,7 @@ func TestNormalBlock(t *testing.T) {
234240
},
235241
})
236242
require.Len(t, barrier.blockedTs, 1)
237-
require.Len(t, barrier.blockedDispatcher, 2)
243+
require.Len(t, event.reportedDispatchers, 2)
238244
msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{
239245
ChangefeedID: "test",
240246
BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{
@@ -257,7 +263,6 @@ func TestNormalBlock(t *testing.T) {
257263
},
258264
})
259265
require.Len(t, barrier.blockedTs, 0)
260-
require.Len(t, barrier.blockedDispatcher, 0)
261266
}
262267

263268
func TestSchemaBlock(t *testing.T) {
@@ -336,7 +341,8 @@ func TestSchemaBlock(t *testing.T) {
336341
require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10)
337342
require.True(t, resp.DispatcherStatuses[0].Action.CommitTs == 10)
338343
require.True(t, resp.DispatcherStatuses[0].Action.Action == heartbeatpb.Action_Write)
339-
event := barrier.blockedTs[10]
344+
key := eventKey{blockTs: 10}
345+
event := barrier.blockedTs[key]
340346
require.Equal(t, uint64(10), event.commitTs)
341347
//the last one will be the writer
342348
require.Equal(t, event.writerDispatcher.ToPB(), dispatcherIDs[1])
@@ -367,7 +373,7 @@ func TestSchemaBlock(t *testing.T) {
367373
resp = msg.Message[0].(*heartbeatpb.HeartBeatResponse)
368374
require.Len(t, resp.DispatcherStatuses, 1)
369375
require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10)
370-
event = barrier.blockedTs[10]
376+
event = barrier.blockedTs[key]
371377
require.Equal(t, uint64(10), event.commitTs)
372378
//the last one will be the writer
373379
require.Equal(t, event.writerDispatcher.ToPB(), dispatcherIDs[1])
@@ -395,7 +401,7 @@ func TestSchemaBlock(t *testing.T) {
395401
heartbeatpb.Action_Pass)
396402
require.Len(t, barrier.blockedTs, 1)
397403
// the writer already advanced
398-
require.Len(t, barrier.blockedDispatcher, 1)
404+
require.Len(t, event.reportedDispatchers, 1)
399405
require.Equal(t, 1, len(sche.Absent()))
400406
require.Equal(t, 0, len(sche.Commiting()))
401407
require.Equal(t, 2, len(sche.Removing()))
@@ -415,7 +421,6 @@ func TestSchemaBlock(t *testing.T) {
415421
},
416422
})
417423
require.Len(t, barrier.blockedTs, 0)
418-
require.Len(t, barrier.blockedDispatcher, 0)
419424
}
420425

421426
func TestSyncPointBlock(t *testing.T) {
@@ -457,6 +462,7 @@ func TestSyncPointBlock(t *testing.T) {
457462
TableIDs: dropTables,
458463
},
459464
NeedAddedTables: []*heartbeatpb.Table{newSpan},
465+
IsSyncPoint: true,
460466
},
461467
},
462468
{
@@ -473,6 +479,7 @@ func TestSyncPointBlock(t *testing.T) {
473479
TableIDs: dropTables,
474480
},
475481
NeedAddedTables: []*heartbeatpb.Table{newSpan},
482+
IsSyncPoint: true,
476483
},
477484
},
478485
},
@@ -511,7 +518,8 @@ func TestSyncPointBlock(t *testing.T) {
511518
require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10)
512519
require.True(t, resp.DispatcherStatuses[0].Action.CommitTs == 10)
513520
require.True(t, resp.DispatcherStatuses[0].Action.Action == heartbeatpb.Action_Write)
514-
event := barrier.blockedTs[10]
521+
key := eventKey{blockTs: 10, isSyncPoint: true}
522+
event := barrier.blockedTs[key]
515523
require.Equal(t, uint64(10), event.commitTs)
516524
//the last one will be the writer
517525
require.Equal(t, event.writerDispatcher.ToPB(), dispatcherIDs[2])
@@ -526,7 +534,7 @@ func TestSyncPointBlock(t *testing.T) {
526534
IsBlocked: true,
527535
BlockTs: 10,
528536
EventDone: true,
529-
IsSyncPoint: false,
537+
IsSyncPoint: true,
530538
},
531539
},
532540
},
@@ -536,7 +544,7 @@ func TestSyncPointBlock(t *testing.T) {
536544
require.Len(t, msgs, 2)
537545
require.Len(t, barrier.blockedTs, 1)
538546
// the writer already advanced
539-
require.Len(t, barrier.blockedDispatcher, 2)
547+
require.Len(t, event.reportedDispatchers, 2)
540548
// other dispatcher advanced checkpoint ts
541549
msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{
542550
ChangefeedID: "test",
@@ -547,7 +555,7 @@ func TestSyncPointBlock(t *testing.T) {
547555
IsBlocked: true,
548556
BlockTs: 10,
549557
EventDone: true,
550-
IsSyncPoint: false,
558+
IsSyncPoint: true,
551559
},
552560
},
553561
{
@@ -556,13 +564,12 @@ func TestSyncPointBlock(t *testing.T) {
556564
IsBlocked: true,
557565
BlockTs: 10,
558566
EventDone: true,
559-
IsSyncPoint: false,
567+
IsSyncPoint: true,
560568
},
561569
},
562570
},
563571
})
564572
require.Len(t, barrier.blockedTs, 0)
565-
require.Len(t, barrier.blockedDispatcher, 0)
566573
}
567574

568575
func TestNonBlocked(t *testing.T) {
@@ -601,7 +608,6 @@ func TestNonBlocked(t *testing.T) {
601608
require.True(t, heartbeatpb.InfluenceType_Normal == resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType)
602609
require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.DispatcherIDs[0], blockedDispatcherIDS[0])
603610
require.Len(t, barrier.blockedTs, 0)
604-
require.Len(t, barrier.blockedDispatcher, 0)
605611
require.Len(t, barrier.controller.Absent(), 2)
606612
}
607613

@@ -630,6 +636,7 @@ func TestSyncPointBlockPerf(t *testing.T) {
630636
InfluenceType: heartbeatpb.InfluenceType_All,
631637
SchemaID: 1,
632638
},
639+
IsSyncPoint: true,
633640
},
634641
})
635642
}

0 commit comments

Comments
 (0)