Skip to content

Commit e125c4c

Browse files
hongyunyansdojjy
andauthored
Support Sync Point Event and Split Block Status Message from Heart Beat Request (#359)
Co-authored-by: jiangjianyuan <sdojjy@qq.com>
1 parent a9792e6 commit e125c4c

18 files changed

+1073
-353
lines changed

downstreamadapter/dispatcher/dispatcher.go

+36-23
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ type Dispatcher struct {
6666
tableSpan *heartbeatpb.TableSpan
6767
sink tisink.Sink
6868

69+
// TableSpanStatus use to report checkpointTs / componentStatus to Maintainer
6970
statusesChan chan *heartbeatpb.TableSpanStatus
7071

72+
// TableSpanBlockStatus use to report block status of ddl/sync point event to Maintainer
73+
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus
74+
7175
SyncPointInfo *SyncPointInfo
7276

7377
componentStatus *ComponentStateWithMutex
@@ -97,6 +101,7 @@ func NewDispatcher(
97101
sink tisink.Sink,
98102
startTs uint64,
99103
statusesChan chan *heartbeatpb.TableSpanStatus,
104+
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus,
100105
filter filter.Filter,
101106
schemaID int64,
102107
schemaIDToDispatchers *SchemaIDToDispatchers,
@@ -106,6 +111,7 @@ func NewDispatcher(
106111
tableSpan: tableSpan,
107112
sink: sink,
108113
statusesChan: statusesChan,
114+
blockStatusesChan: blockStatusesChan,
109115
SyncPointInfo: syncPointInfo,
110116
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
111117
resolvedTs: newTsWithMutex(startTs),
@@ -176,12 +182,18 @@ func (d *Dispatcher) addBlockEventToSinkWhenAvailable(event commonEvent.BlockEve
176182
// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and wake the dispatcherEventsHandler
177183
func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) {
178184
if d.blockPendingEvent == nil {
185+
// receive outdated status
186+
// If status is about ack, ignore it.
187+
// If status is about action, we need to return message show we have finished the event.
179188
if dispatcherStatus.GetAction() != nil {
180-
// 只可能出现在 event 已经推进了,但是还重复收到了 action 消息的时候,则重发包含 checkpointTs 的心跳
181-
d.statusesChan <- &heartbeatpb.TableSpanStatus{
182-
ID: d.id.ToPB(),
183-
ComponentStatus: heartbeatpb.ComponentState_Working,
184-
CheckpointTs: d.GetCheckpointTs(),
189+
d.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
190+
ID: d.id.ToPB(),
191+
State: &heartbeatpb.State{
192+
IsBlocked: true,
193+
BlockTs: dispatcherStatus.GetAction().CommitTs,
194+
IsSyncPoint: dispatcherStatus.GetAction().IsSyncPoint,
195+
EventDone: true,
196+
},
185197
}
186198
}
187199
return
@@ -197,11 +209,15 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
197209
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
198210
dispatcherEventDynamicStream.Wake() <- d.id
199211
}
200-
d.statusesChan <- &heartbeatpb.TableSpanStatus{
201-
ID: d.id.ToPB(),
202-
ComponentStatus: heartbeatpb.ComponentState_Working,
203-
CheckpointTs: d.GetCheckpointTs(),
204-
}
212+
}
213+
d.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
214+
ID: d.id.ToPB(),
215+
State: &heartbeatpb.State{
216+
IsBlocked: true,
217+
BlockTs: dispatcherStatus.GetAction().CommitTs,
218+
IsSyncPoint: dispatcherStatus.GetAction().IsSyncPoint,
219+
EventDone: true,
220+
},
205221
}
206222
}
207223

@@ -258,28 +274,28 @@ func shouldBlock(event commonEvent.BlockEvent) bool {
258274
}
259275

260276
// 1.If the event is a single table DDL, it will be added to the sink for writing to downstream(async). If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
261-
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanStatus message with ddl info to send to maintainer.
277+
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
262278
func (d *Dispatcher) DealWithBlockEventWhenProgressEmpty() {
263279
if !shouldBlock(d.blockPendingEvent) {
264280
d.sink.AddBlockEvent(d.blockPendingEvent, d.tableProgress)
265281
if d.blockPendingEvent.GetNeedAddedTables() != nil || d.blockPendingEvent.GetNeedDroppedTables() != nil {
266-
message := &heartbeatpb.TableSpanStatus{
267-
ID: d.id.ToPB(),
268-
ComponentStatus: heartbeatpb.ComponentState_Working,
282+
message := &heartbeatpb.TableSpanBlockStatus{
283+
ID: d.id.ToPB(),
269284
State: &heartbeatpb.State{
270285
IsBlocked: false,
271286
BlockTs: d.blockPendingEvent.GetCommitTs(),
272287
NeedDroppedTables: d.blockPendingEvent.GetNeedDroppedTables().ToPB(),
273288
NeedAddedTables: commonEvent.ToTablesPB(d.blockPendingEvent.GetNeedAddedTables()),
289+
IsSyncPoint: false, // sync point event must should block
290+
EventDone: false,
274291
},
275292
}
276293
d.SetResendTask(newResendTask(message, d))
277-
d.statusesChan <- message
294+
d.blockStatusesChan <- message
278295
}
279296
} else {
280-
message := &heartbeatpb.TableSpanStatus{
281-
ID: d.id.ToPB(),
282-
ComponentStatus: heartbeatpb.ComponentState_Working,
297+
message := &heartbeatpb.TableSpanBlockStatus{
298+
ID: d.id.ToPB(),
283299
State: &heartbeatpb.State{
284300
IsBlocked: true,
285301
BlockTs: d.blockPendingEvent.GetCommitTs(),
@@ -288,10 +304,11 @@ func (d *Dispatcher) DealWithBlockEventWhenProgressEmpty() {
288304
NeedAddedTables: commonEvent.ToTablesPB(d.blockPendingEvent.GetNeedAddedTables()),
289305
UpdatedSchemas: commonEvent.ToSchemaIDChangePB(d.blockPendingEvent.GetUpdatedSchemas()), // only exists for rename table and rename tables
290306
IsSyncPoint: d.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent,
307+
EventDone: false,
291308
},
292309
}
293310
d.SetResendTask(newResendTask(message, d))
294-
d.statusesChan <- message
311+
d.blockStatusesChan <- message
295312
}
296313

297314
// dealing with events which update schema ids
@@ -344,10 +361,6 @@ func (d *Dispatcher) GetId() common.DispatcherID {
344361
return d.id
345362
}
346363

347-
func (d *Dispatcher) GetStatusesChan() chan *heartbeatpb.TableSpanStatus {
348-
return d.statusesChan
349-
}
350-
351364
func (d *Dispatcher) CancelResendTask() {
352365
if d.resendTask != nil {
353366
d.resendTask.Cancel()

downstreamadapter/dispatcher/helper.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (h *DispatcherStatusHandler) Handle(dispatcher *Dispatcher, events ...Dispa
190190
// CheckTableProgressEmptyTask is reponsible for checking whether the tableProgress is empty.
191191
// If the tableProgress is empty,
192192
// 1. If the event is a single table DDL, it will be added to the sink for writing to downstream(async).
193-
// 2. If the event is a multi-table DDL, it will generate a TableSpanStatus message with ddl info to send to maintainer.
193+
// 2. If the event is a multi-table DDL, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
194194
// When the tableProgress is empty, the task will finished after this execution.
195195
// If the tableProgress is not empty, the task will be rescheduled after 10ms.
196196
type CheckProgressEmptyTask struct {
@@ -215,15 +215,15 @@ func (t *CheckProgressEmptyTask) Execute() time.Time {
215215
return time.Now().Add(10 * time.Millisecond)
216216
}
217217

218-
// Resend Task is reponsible for resending the TableSpanStatus message with ddl info to maintainer each 50ms.
218+
// Resend Task is reponsible for resending the TableSpanBlockStatus message with ddl info to maintainer each 50ms.
219219
// The task will be cancelled when the the dispatcher received the ack message from the maintainer
220220
type ResendTask struct {
221-
message *heartbeatpb.TableSpanStatus
221+
message *heartbeatpb.TableSpanBlockStatus
222222
dispatcher *Dispatcher
223223
taskHandle *threadpool.TaskHandle
224224
}
225225

226-
func newResendTask(message *heartbeatpb.TableSpanStatus, dispatcher *Dispatcher) *ResendTask {
226+
func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher *Dispatcher) *ResendTask {
227227
taskScheduler := GetDispatcherTaskScheduler()
228228
t := &ResendTask{
229229
message: message,
@@ -234,7 +234,7 @@ func newResendTask(message *heartbeatpb.TableSpanStatus, dispatcher *Dispatcher)
234234
}
235235

236236
func (t *ResendTask) Execute() time.Time {
237-
t.dispatcher.GetStatusesChan() <- t.message
237+
t.dispatcher.blockStatusesChan <- t.message
238238
return time.Now().Add(200 * time.Millisecond)
239239
}
240240

@@ -251,10 +251,10 @@ func (t *ResendTask) Cancel() {
251251
// b. If the tableProgress is not empty, we will generate a CheckTableProgressEmptyTask to periodly check whether the tableProgress is empty,
252252
// and then add the DDL event to the sink for writing to downstream(async).
253253
// 2. If it is a multi-table DDL,
254-
// a. If the tableProgress is empty(previous events are flushed successfully),We will generate a TableSpanStatus message with ddl info to send to maintainer.
254+
// a. If the tableProgress is empty(previous events are flushed successfully),We will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
255255
// b. If the tableProgress is not empty, we will generate a CheckTableProgressEmptyTask to periodly check whether the tableProgress is empty,
256-
// and then we will generate a TableSpanStatus message with ddl info to send to maintainer.
257-
// for the multi-table DDL, we will also generate a ResendTask to resend the TableSpanStatus message with ddl info to maintainer each 50ms to avoid message is missing.
256+
// and then we will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
257+
// for the multi-table DDL, we will also generate a ResendTask to resend the TableSpanBlockStatus message with ddl info to maintainer each 50ms to avoid message is missing.
258258
//
259259
// Considering for ddl event, we always do an async write, so we need to be blocked before the ddl event flushed to downstream successfully.
260260
// Thus, we add a callback function to let the hander be waked when the ddl event flushed to downstream successfully.

downstreamadapter/dispatchermanager/event_dispatcher_manager.go

+62-14
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ One EventDispatcherManager can only have one Sink.
5454
type EventDispatcherManager struct {
5555
dispatcherMap *DispatcherMap
5656

57-
heartbeatRequestQueue *HeartbeatRequestQueue
57+
heartbeatRequestQueue *HeartbeatRequestQueue
58+
blockStatusRequestQueue *BlockStatusRequestQueue
5859

5960
cancel context.CancelFunc
6061
wg sync.WaitGroup
@@ -67,7 +68,11 @@ type EventDispatcherManager struct {
6768

6869
// statusesChan will fetch the tableSpan status that need to contains in the heartbeat info.
6970
statusesChan chan *heartbeatpb.TableSpanStatus
70-
filter filter.Filter
71+
// blockStatusesChan will fetch the tableSpan block status about ddl event and sync point event
72+
// that need to report to maintainer
73+
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus
74+
75+
filter filter.Filter
7176

7277
closing bool
7378
closed atomic.Bool
@@ -99,6 +104,7 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
99104
changefeedID: changefeedID,
100105
maintainerID: maintainerID,
101106
statusesChan: make(chan *heartbeatpb.TableSpanStatus, 10000),
107+
blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1000),
102108
cancel: cancel,
103109
config: cfConfig,
104110
schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(),
@@ -142,6 +148,12 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
142148
defer manager.wg.Done()
143149
manager.CollectHeartbeatInfoWhenStatesChanged(ctx)
144150
}()
151+
152+
manager.wg.Add(1)
153+
go func() {
154+
defer manager.wg.Done()
155+
manager.CollectBlockStatusRequest(ctx)
156+
}()
145157
return manager
146158
}
147159

@@ -222,7 +234,7 @@ func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan
222234
syncPointInfo.SyncPointRetention = e.syncPointRetention
223235
}
224236

225-
dispatcher := dispatcher.NewDispatcher(id, tableSpan, e.sink, startTs, e.statusesChan, e.filter, schemaID, e.schemaIDToDispatchers, syncPointInfo)
237+
dispatcher := dispatcher.NewDispatcher(id, tableSpan, e.sink, startTs, e.statusesChan, e.blockStatusesChan, e.filter, schemaID, e.schemaIDToDispatchers, syncPointInfo)
226238

227239
if tableSpan.Equal(heartbeatpb.DDLSpan) {
228240
e.tableTriggerEventDispatcher = dispatcher
@@ -238,7 +250,7 @@ func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan
238250
},
239251
)
240252
e.dispatcherMap.Set(id, dispatcher)
241-
e.GetStatusesChan() <- &heartbeatpb.TableSpanStatus{
253+
e.statusesChan <- &heartbeatpb.TableSpanStatus{
242254
ID: id.ToPB(),
243255
ComponentStatus: heartbeatpb.ComponentState_Working,
244256
}
@@ -255,6 +267,42 @@ func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan
255267
return dispatcher
256268
}
257269

270+
func (e *EventDispatcherManager) CollectBlockStatusRequest(ctx context.Context) {
271+
for {
272+
blockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0)
273+
select {
274+
case <-ctx.Done():
275+
return
276+
case blockStatus := <-e.blockStatusesChan:
277+
blockStatusMessage = append(blockStatusMessage, blockStatus)
278+
279+
delay := time.NewTimer(10 * time.Millisecond)
280+
loop:
281+
for {
282+
select {
283+
case blockStatus := <-e.blockStatusesChan:
284+
blockStatusMessage = append(blockStatusMessage, blockStatus)
285+
case <-delay.C:
286+
break loop
287+
}
288+
}
289+
290+
// Release resources promptly
291+
if !delay.Stop() {
292+
select {
293+
case <-delay.C:
294+
default:
295+
}
296+
}
297+
298+
var message heartbeatpb.BlockStatusRequest
299+
message.ChangefeedID = e.changefeedID.ID
300+
message.BlockStatuses = blockStatusMessage
301+
e.blockStatusRequestQueue.Enqueue(&BlockStatusRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})
302+
}
303+
}
304+
}
305+
258306
// CollectHeartbeatInfoWhenStatesChanged use to collect the heartbeat info when GetTableSpanStatusesChan() get infos
259307
// It happenes when some dispatchers change status, such as --> working; --> stopped; --> stopping
260308
// Considering collect the heartbeat info is a time-consuming operation(we need to scan all the dispatchers),
@@ -265,14 +313,14 @@ func (e *EventDispatcherManager) CollectHeartbeatInfoWhenStatesChanged(ctx conte
265313
select {
266314
case <-ctx.Done():
267315
return
268-
case tableSpanStatus := <-e.GetStatusesChan():
316+
case tableSpanStatus := <-e.statusesChan:
269317
statusMessage = append(statusMessage, tableSpanStatus)
270318

271319
delay := time.NewTimer(10 * time.Millisecond)
272320
loop:
273321
for {
274322
select {
275-
case tableSpanStatus := <-e.GetStatusesChan():
323+
case tableSpanStatus := <-e.statusesChan:
276324
statusMessage = append(statusMessage, tableSpanStatus)
277325
case <-delay.C:
278326
break loop
@@ -290,7 +338,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfoWhenStatesChanged(ctx conte
290338
var message heartbeatpb.HeartBeatRequest
291339
message.ChangefeedID = e.changefeedID.ID
292340
message.Statuses = statusMessage
293-
e.GetHeartbeatRequestQueue().Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})
341+
e.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})
294342
}
295343
}
296344
}
@@ -305,7 +353,7 @@ func (e *EventDispatcherManager) RemoveDispatcher(id common.DispatcherID) {
305353
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcher)
306354
dispatcher.Remove()
307355
} else {
308-
e.GetStatusesChan() <- &heartbeatpb.TableSpanStatus{
356+
e.statusesChan <- &heartbeatpb.TableSpanStatus{
309357
ID: id.ToPB(),
310358
ComponentStatus: heartbeatpb.ComponentState_Stopped,
311359
}
@@ -422,16 +470,16 @@ func (e *EventDispatcherManager) GetChangeFeedID() model.ChangeFeedID {
422470
return e.changefeedID
423471
}
424472

425-
func (e *EventDispatcherManager) GetHeartbeatRequestQueue() *HeartbeatRequestQueue {
426-
return e.heartbeatRequestQueue
427-
}
428-
429473
func (e *EventDispatcherManager) SetHeartbeatRequestQueue(heartbeatRequestQueue *HeartbeatRequestQueue) {
430474
e.heartbeatRequestQueue = heartbeatRequestQueue
431475
}
432476

433-
func (e *EventDispatcherManager) GetStatusesChan() chan *heartbeatpb.TableSpanStatus {
434-
return e.statusesChan
477+
func (e *EventDispatcherManager) SetBlockStatusRequestQueue(blockStatusRequestQueue *BlockStatusRequestQueue) {
478+
e.blockStatusRequestQueue = blockStatusRequestQueue
479+
}
480+
481+
func (e *EventDispatcherManager) GetBlockStatuses() chan *heartbeatpb.TableSpanBlockStatus {
482+
return e.blockStatusesChan
435483
}
436484

437485
func (e *EventDispatcherManager) SetMaintainerID(maintainerID node.ID) {

0 commit comments

Comments
 (0)