Skip to content

Commit f5162aa

Browse files
authored
Update the usage of dynamic stream in dispatchers (#372)
1 parent 0ae2552 commit f5162aa

File tree

14 files changed

+268
-199
lines changed

14 files changed

+268
-199
lines changed

downstreamadapter/dispatcher/dispatcher.go

+58-63
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ type Dispatcher struct {
8686

8787
tableProgress *types.TableProgress
8888

89-
resendTask *ResendTask
90-
checkTableProgressEmptyTask *CheckProgressEmptyTask
89+
resendTask *ResendTask
9190

9291
schemaIDToDispatchers *SchemaIDToDispatchers
9392
schemaID int64
@@ -145,37 +144,6 @@ func NewDispatcher(
145144
return dispatcher
146145
}
147146

148-
// 1. 如果是单表内的 ddl,达到下推的条件为: sink 中没有还没执行完的当前表的 event
149-
// 2. 如果是多表内的 ddl 或者是表间的 ddl,则需要满足的条件为:
150-
// 2.1 sink 中没有还没执行完的当前表的 event
151-
// 2.2 maintainer 通知自己可以 write 或者 pass event
152-
//
153-
// TODO:特殊处理有 add index 的逻辑
154-
// Block Event including ddl Event and Sync Point Event
155-
func (d *Dispatcher) addBlockEventToSinkWhenAvailable(event commonEvent.BlockEvent) {
156-
// 根据 filter 过滤 query 中不需要 send to downstream 的数据
157-
// 但应当不出现整个 query 都不需要 send to downstream 的 ddl,这种 ddl 不应该发给 dispatcher
158-
// TODO: ddl 影响到的 tableSpan 也在 filter 中过滤一遍
159-
if event.GetType() == commonEvent.TypeDDLEvent {
160-
ddlEvent := event.(*commonEvent.DDLEvent)
161-
// TODO:看一下这种写法有没有问题,加个测试后面
162-
err := d.filter.FilterDDLEvent(ddlEvent)
163-
if err != nil {
164-
log.Error("filter ddl query failed", zap.Error(err))
165-
// 这里怎么处理更合适呢?有错然后反上去让 changefeed 报错
166-
return
167-
}
168-
}
169-
170-
d.blockPendingEvent = event
171-
172-
if d.tableProgress.Empty() {
173-
d.DealWithBlockEventWhenProgressEmpty()
174-
} else {
175-
d.checkTableProgressEmptyTask = newCheckProgressEmptyTask(d)
176-
}
177-
}
178-
179147
// Each dispatcher status may contain a ACK info or a dispatcher action or both.
180148
// If we get a ack info, we need to check whether the ack is for the current pending ddl event. If so, we can cancel the resend task.
181149
// If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. If so, we can deal the ddl event based on the action.
@@ -228,37 +196,63 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
228196
}
229197
}
230198

231-
func (d *Dispatcher) HandleEvent(event commonEvent.Event) (block bool) {
232-
switch event.GetType() {
233-
case commonEvent.TypeResolvedEvent:
234-
d.resolvedTs.Set(event.(commonEvent.ResolvedEvent).ResolvedTs)
235-
return false
236-
case commonEvent.TypeDMLEvent:
237-
d.sink.AddDMLEvent(event.(*commonEvent.DMLEvent), d.tableProgress)
238-
return false
239-
case commonEvent.TypeDDLEvent:
240-
event := event.(*commonEvent.DDLEvent)
241-
if d.tableNameStore != nil {
242-
d.tableNameStore.AddEvent(event)
199+
// HandleEvents can batch handle events about resolvedTs Event and DML Event.
200+
// While for DDLEvent and SyncPointEvent, they should be handled singly,
201+
// because they are block events.
202+
// We ensure we only will receive one event when it's ddl event or sync point event by IsBatchable() function.
203+
// When we handle events, we don't have any previous events still in sink.
204+
func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block bool) {
205+
onlyResolvedTs := true
206+
for _, dispatcherEvent := range dispatcherEvents {
207+
event := dispatcherEvent.Event
208+
switch event.GetType() {
209+
case commonEvent.TypeResolvedEvent:
210+
d.resolvedTs.Set(event.(commonEvent.ResolvedEvent).ResolvedTs)
211+
case commonEvent.TypeDMLEvent:
212+
onlyResolvedTs = false
213+
event := event.(*commonEvent.DMLEvent)
214+
event.AddPostFlushFunc(func() {
215+
// Considering dml event in sink may be write to downstream not in order,
216+
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
217+
// and wake dynamic stream to handle the next events.
218+
if d.tableProgress.Empty() {
219+
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
220+
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
221+
}
222+
})
223+
d.sink.AddDMLEvent(event, d.tableProgress)
224+
case commonEvent.TypeDDLEvent:
225+
if len(dispatcherEvents) != 1 {
226+
log.Error("ddl event should only be singly handled", zap.Any("dispatcherID", d.id))
227+
}
228+
onlyResolvedTs = false
229+
230+
event := event.(*commonEvent.DDLEvent)
231+
if d.tableNameStore != nil {
232+
d.tableNameStore.AddEvent(event)
233+
}
234+
event.AddPostFlushFunc(func() {
235+
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
236+
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
237+
})
238+
d.dealWithBlockEvent(event)
239+
case commonEvent.TypeSyncPointEvent:
240+
if len(dispatcherEvents) != 1 {
241+
log.Error("sync point event should only be singly handled", zap.Any("dispatcherID", d.id))
242+
}
243+
onlyResolvedTs = false
244+
event := event.(*commonEvent.SyncPointEvent)
245+
event.AddPostFlushFunc(func() {
246+
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
247+
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
248+
})
249+
d.dealWithBlockEvent(event)
250+
default:
251+
log.Error("invalid event type", zap.Any("event Type", event.GetType()))
252+
return false
243253
}
244-
event.AddPostFlushFunc(func() {
245-
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
246-
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
247-
})
248-
d.addBlockEventToSinkWhenAvailable(event)
249-
return true
250-
case commonEvent.TypeSyncPointEvent:
251-
event := event.(*commonEvent.SyncPointEvent)
252-
event.AddPostFlushFunc(func() {
253-
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
254-
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
255-
})
256-
d.addBlockEventToSinkWhenAvailable(event)
257-
return true
258-
default:
259-
log.Error("invalid event type", zap.Any("event Type", event.GetType()))
260254
}
261-
return false
255+
return !onlyResolvedTs
262256
}
263257

264258
func shouldBlock(event commonEvent.BlockEvent) bool {
@@ -276,7 +270,8 @@ func shouldBlock(event commonEvent.BlockEvent) bool {
276270

277271
// 1.If the event is a single table DDL, it will be added to the sink for writing to downstream(async). If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
278272
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
279-
func (d *Dispatcher) DealWithBlockEventWhenProgressEmpty() {
273+
func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
274+
d.blockPendingEvent = event
280275
if !shouldBlock(d.blockPendingEvent) {
281276
d.sink.AddBlockEvent(d.blockPendingEvent, d.tableProgress)
282277
if d.blockPendingEvent.GetNeedAddedTables() != nil || d.blockPendingEvent.GetNeedDroppedTables() != nil {

0 commit comments

Comments
 (0)