Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the usage of dynamic stream in dispatchers #372

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 58 additions & 63 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ type Dispatcher struct {

tableProgress *types.TableProgress

resendTask *ResendTask
checkTableProgressEmptyTask *CheckProgressEmptyTask
resendTask *ResendTask

schemaIDToDispatchers *SchemaIDToDispatchers
schemaID int64
Expand Down Expand Up @@ -145,37 +144,6 @@ func NewDispatcher(
return dispatcher
}

// 1. 如果是单表内的 ddl,达到下推的条件为: sink 中没有还没执行完的当前表的 event
// 2. 如果是多表内的 ddl 或者是表间的 ddl,则需要满足的条件为:
// 2.1 sink 中没有还没执行完的当前表的 event
// 2.2 maintainer 通知自己可以 write 或者 pass event
//
// TODO:特殊处理有 add index 的逻辑
// Block Event including ddl Event and Sync Point Event
func (d *Dispatcher) addBlockEventToSinkWhenAvailable(event commonEvent.BlockEvent) {
// 根据 filter 过滤 query 中不需要 send to downstream 的数据
// 但应当不出现整个 query 都不需要 send to downstream 的 ddl,这种 ddl 不应该发给 dispatcher
// TODO: ddl 影响到的 tableSpan 也在 filter 中过滤一遍
if event.GetType() == commonEvent.TypeDDLEvent {
ddlEvent := event.(*commonEvent.DDLEvent)
// TODO:看一下这种写法有没有问题,加个测试后面
err := d.filter.FilterDDLEvent(ddlEvent)
if err != nil {
log.Error("filter ddl query failed", zap.Error(err))
// 这里怎么处理更合适呢?有错然后反上去让 changefeed 报错
return
}
}

d.blockPendingEvent = event

if d.tableProgress.Empty() {
d.DealWithBlockEventWhenProgressEmpty()
} else {
d.checkTableProgressEmptyTask = newCheckProgressEmptyTask(d)
}
}

// Each dispatcher status may contain a ACK info or a dispatcher action or both.
// If we get a ack info, we need to check whether the ack is for the current pending ddl event. If so, we can cancel the resend task.
// If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. If so, we can deal the ddl event based on the action.
Expand Down Expand Up @@ -228,37 +196,63 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
}
}

func (d *Dispatcher) HandleEvent(event commonEvent.Event) (block bool) {
switch event.GetType() {
case commonEvent.TypeResolvedEvent:
d.resolvedTs.Set(event.(commonEvent.ResolvedEvent).ResolvedTs)
return false
case commonEvent.TypeDMLEvent:
d.sink.AddDMLEvent(event.(*commonEvent.DMLEvent), d.tableProgress)
return false
case commonEvent.TypeDDLEvent:
event := event.(*commonEvent.DDLEvent)
if d.tableNameStore != nil {
d.tableNameStore.AddEvent(event)
// HandleEvents can batch handle events about resolvedTs Event and DML Event.
// While for DDLEvent and SyncPointEvent, they should be handled singly,
// because they are block events.
// We ensure we only will receive one event when it's ddl event or sync point event by IsBatchable() function.
// When we handle events, we don't have any previous events still in sink.
func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block bool) {
onlyResolvedTs := true
for _, dispatcherEvent := range dispatcherEvents {
event := dispatcherEvent.Event
switch event.GetType() {
case commonEvent.TypeResolvedEvent:
d.resolvedTs.Set(event.(commonEvent.ResolvedEvent).ResolvedTs)
case commonEvent.TypeDMLEvent:
onlyResolvedTs = false
event := event.(*commonEvent.DMLEvent)
event.AddPostFlushFunc(func() {
// Considering dml event in sink may be write to downstream not in order,
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
// and wake dynamic stream to handle the next events.
if d.tableProgress.Empty() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
}
})
d.sink.AddDMLEvent(event, d.tableProgress)
case commonEvent.TypeDDLEvent:
if len(dispatcherEvents) != 1 {
log.Error("ddl event should only be singly handled", zap.Any("dispatcherID", d.id))
}
onlyResolvedTs = false

event := event.(*commonEvent.DDLEvent)
if d.tableNameStore != nil {
d.tableNameStore.AddEvent(event)
}
event.AddPostFlushFunc(func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.dealWithBlockEvent(event)
case commonEvent.TypeSyncPointEvent:
if len(dispatcherEvents) != 1 {
log.Error("sync point event should only be singly handled", zap.Any("dispatcherID", d.id))
}
onlyResolvedTs = false
event := event.(*commonEvent.SyncPointEvent)
event.AddPostFlushFunc(func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.dealWithBlockEvent(event)
default:
log.Error("invalid event type", zap.Any("event Type", event.GetType()))
return false
}
event.AddPostFlushFunc(func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.addBlockEventToSinkWhenAvailable(event)
return true
case commonEvent.TypeSyncPointEvent:
event := event.(*commonEvent.SyncPointEvent)
event.AddPostFlushFunc(func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.addBlockEventToSinkWhenAvailable(event)
return true
default:
log.Error("invalid event type", zap.Any("event Type", event.GetType()))
}
return false
return !onlyResolvedTs
}

func shouldBlock(event commonEvent.BlockEvent) bool {
Expand All @@ -276,7 +270,8 @@ func shouldBlock(event commonEvent.BlockEvent) bool {

// 1.If the event is a single table DDL, it will be added to the sink for writing to downstream(async). If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
func (d *Dispatcher) DealWithBlockEventWhenProgressEmpty() {
func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
d.blockPendingEvent = event
if !shouldBlock(d.blockPendingEvent) {
d.sink.AddBlockEvent(d.blockPendingEvent, d.tableProgress)
if d.blockPendingEvent.GetNeedAddedTables() != nil || d.blockPendingEvent.GetNeedDroppedTables() != nil {
Expand Down
Loading
Loading