Skip to content

Commit 216a5ca

Browse files
committed
refine barrier
1 parent cd41921 commit 216a5ca

File tree

2 files changed

+6
-10
lines changed

2 files changed

+6
-10
lines changed

maintainer/barrier.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,16 @@ func (b *Barrier) handleNoStateHeartbeat(dispatcherID common.DispatcherID, check
9292
}
9393
// no block event send ,but reached the block point
9494
if checkpointTs == event.commitTs {
95-
event.advancedDispatchers[dispatcherID] = true
95+
action := event.dispatcherReachedBlockTs(dispatcherID)
9696
// all dispatcher reported heartbeat, select one to write
97-
if !event.selected && event.allDispatcherReported() {
97+
if action != nil {
9898
dispatcherStatus := &heartbeatpb.DispatcherStatus{
9999
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
100100
InfluenceType: heartbeatpb.InfluenceType_Normal,
101-
DispatcherIDs: []*heartbeatpb.DispatcherID{dispatcherID.ToPB()},
101+
DispatcherIDs: []*heartbeatpb.DispatcherID{event.writerDispatcher.ToPB()},
102102
},
103-
Action: &heartbeatpb.DispatcherAction{
104-
Action: heartbeatpb.Action_Write,
105-
CommitTs: event.commitTs,
106-
}}
107-
event.writerDispatcher = dispatcherID
108-
event.selected = true
103+
Action: action,
104+
}
109105
return dispatcherStatus
110106
}
111107
}

maintainer/barrier_event.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (b *BarrierEvent) dispatcherReachedBlockTs(dispatcherID common.DispatcherID
174174
return nil
175175
}
176176
b.advancedDispatchers[dispatcherID] = true
177-
// all dispatcher reported heartbeat, select one to write
177+
// all dispatcher reported heartbeat, select the last one to write
178178
if b.allDispatcherReported() {
179179
b.writerDispatcher = dispatcherID
180180
b.selected = true

0 commit comments

Comments
 (0)