diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 194d5dc86..c9914ba4a 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -86,8 +86,7 @@ type Dispatcher struct { tableProgress *types.TableProgress - resendTask *ResendTask - checkTableProgressEmptyTask *CheckProgressEmptyTask + resendTask *ResendTask schemaIDToDispatchers *SchemaIDToDispatchers schemaID int64 @@ -145,37 +144,6 @@ func NewDispatcher( return dispatcher } -// 1. 如果是单表内的 ddl,达到下推的条件为: sink 中没有还没执行完的当前表的 event -// 2. 如果是多表内的 ddl 或者是表间的 ddl,则需要满足的条件为: -// 2.1 sink 中没有还没执行完的当前表的 event -// 2.2 maintainer 通知自己可以 write 或者 pass event -// -// TODO:特殊处理有 add index 的逻辑 -// Block Event including ddl Event and Sync Point Event -func (d *Dispatcher) addBlockEventToSinkWhenAvailable(event commonEvent.BlockEvent) { - // 根据 filter 过滤 query 中不需要 send to downstream 的数据 - // 但应当不出现整个 query 都不需要 send to downstream 的 ddl,这种 ddl 不应该发给 dispatcher - // TODO: ddl 影响到的 tableSpan 也在 filter 中过滤一遍 - if event.GetType() == commonEvent.TypeDDLEvent { - ddlEvent := event.(*commonEvent.DDLEvent) - // TODO:看一下这种写法有没有问题,加个测试后面 - err := d.filter.FilterDDLEvent(ddlEvent) - if err != nil { - log.Error("filter ddl query failed", zap.Error(err)) - // 这里怎么处理更合适呢?有错然后反上去让 changefeed 报错 - return - } - } - - d.blockPendingEvent = event - - if d.tableProgress.Empty() { - d.DealWithBlockEventWhenProgressEmpty() - } else { - d.checkTableProgressEmptyTask = newCheckProgressEmptyTask(d) - } -} - // Each dispatcher status may contain a ACK info or a dispatcher action or both. // If we get a ack info, we need to check whether the ack is for the current pending ddl event. If so, we can cancel the resend task. // If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. If so, we can deal the ddl event based on the action. @@ -228,37 +196,63 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat } } -func (d *Dispatcher) HandleEvent(event commonEvent.Event) (block bool) { - switch event.GetType() { - case commonEvent.TypeResolvedEvent: - d.resolvedTs.Set(event.(commonEvent.ResolvedEvent).ResolvedTs) - return false - case commonEvent.TypeDMLEvent: - d.sink.AddDMLEvent(event.(*commonEvent.DMLEvent), d.tableProgress) - return false - case commonEvent.TypeDDLEvent: - event := event.(*commonEvent.DDLEvent) - if d.tableNameStore != nil { - d.tableNameStore.AddEvent(event) +// HandleEvents can batch handle events about resolvedTs Event and DML Event. +// While for DDLEvent and SyncPointEvent, they should be handled singly, +// because they are block events. +// We ensure we only will receive one event when it's ddl event or sync point event by IsBatchable() function. +// When we handle events, we don't have any previous events still in sink. +func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block bool) { + onlyResolvedTs := true + for _, dispatcherEvent := range dispatcherEvents { + event := dispatcherEvent.Event + switch event.GetType() { + case commonEvent.TypeResolvedEvent: + d.resolvedTs.Set(event.(commonEvent.ResolvedEvent).ResolvedTs) + case commonEvent.TypeDMLEvent: + onlyResolvedTs = false + event := event.(*commonEvent.DMLEvent) + event.AddPostFlushFunc(func() { + // Considering dml event in sink may be write to downstream not in order, + // thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely + // and wake dynamic stream to handle the next events. + if d.tableProgress.Empty() { + dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream() + dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() + } + }) + d.sink.AddDMLEvent(event, d.tableProgress) + case commonEvent.TypeDDLEvent: + if len(dispatcherEvents) != 1 { + log.Error("ddl event should only be singly handled", zap.Any("dispatcherID", d.id)) + } + onlyResolvedTs = false + + event := event.(*commonEvent.DDLEvent) + if d.tableNameStore != nil { + d.tableNameStore.AddEvent(event) + } + event.AddPostFlushFunc(func() { + dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream() + dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() + }) + d.dealWithBlockEvent(event) + case commonEvent.TypeSyncPointEvent: + if len(dispatcherEvents) != 1 { + log.Error("sync point event should only be singly handled", zap.Any("dispatcherID", d.id)) + } + onlyResolvedTs = false + event := event.(*commonEvent.SyncPointEvent) + event.AddPostFlushFunc(func() { + dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream() + dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() + }) + d.dealWithBlockEvent(event) + default: + log.Error("invalid event type", zap.Any("event Type", event.GetType())) + return false } - event.AddPostFlushFunc(func() { - dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream() - dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() - }) - d.addBlockEventToSinkWhenAvailable(event) - return true - case commonEvent.TypeSyncPointEvent: - event := event.(*commonEvent.SyncPointEvent) - event.AddPostFlushFunc(func() { - dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream() - dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID() - }) - d.addBlockEventToSinkWhenAvailable(event) - return true - default: - log.Error("invalid event type", zap.Any("event Type", event.GetType())) } - return false + return !onlyResolvedTs } func shouldBlock(event commonEvent.BlockEvent) bool { @@ -276,7 +270,8 @@ func shouldBlock(event commonEvent.BlockEvent) bool { // 1.If the event is a single table DDL, it will be added to the sink for writing to downstream(async). If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer // 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer. -func (d *Dispatcher) DealWithBlockEventWhenProgressEmpty() { +func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) { + d.blockPendingEvent = event if !shouldBlock(d.blockPendingEvent) { d.sink.AddBlockEvent(d.blockPendingEvent, d.tableProgress) if d.blockPendingEvent.GetNeedAddedTables() != nil || d.blockPendingEvent.GetNeedDroppedTables() != nil { diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 071e1aede..0c5a76124 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -137,77 +137,6 @@ type HeartBeatInfo struct { IsRemoving bool } -type DispatcherStatusWithID struct { - id common.DispatcherID - status *heartbeatpb.DispatcherStatus -} - -func NewDispatcherStatusWithID(dispatcherStatus *heartbeatpb.DispatcherStatus, dispatcherID common.DispatcherID) DispatcherStatusWithID { - return DispatcherStatusWithID{ - status: dispatcherStatus, - id: dispatcherID, - } -} - -func (d *DispatcherStatusWithID) GetDispatcherStatus() *heartbeatpb.DispatcherStatus { - return d.status -} - -func (d *DispatcherStatusWithID) GetDispatcherID() common.DispatcherID { - return d.id -} - -// DispatcherStatusHandler is used to handle the DispatcherStatus event. -// Each dispatcher status may contain a ACK info or a dispatcher action or both. -// If we get a ack info, we need to check whether the ack is for the current pending ddl event. -// If so, we can cancel the resend task. -// If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. -// If so, we can deal the ddl event based on the action. -// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream(async). -// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and -// wake the dispatcherEventsHandler to handle the event. -type DispatcherStatusHandler struct { -} - -func (h *DispatcherStatusHandler) Path(event DispatcherStatusWithID) common.DispatcherID { - return event.GetDispatcherID() -} - -func (h *DispatcherStatusHandler) Handle(dispatcher *Dispatcher, events ...DispatcherStatusWithID) (await bool) { - for _, event := range events { - dispatcher.HandleDispatcherStatus(event.GetDispatcherStatus()) - } - return false -} - -// CheckTableProgressEmptyTask is reponsible for checking whether the tableProgress is empty. -// If the tableProgress is empty, -// 1. If the event is a single table DDL, it will be added to the sink for writing to downstream(async). -// 2. If the event is a multi-table DDL, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer. -// When the tableProgress is empty, the task will finished after this execution. -// If the tableProgress is not empty, the task will be rescheduled after 10ms. -type CheckProgressEmptyTask struct { - dispatcher *Dispatcher - taskHandle *threadpool.TaskHandle -} - -func newCheckProgressEmptyTask(dispatcher *Dispatcher) *CheckProgressEmptyTask { - taskScheduler := GetDispatcherTaskScheduler() - t := &CheckProgressEmptyTask{ - dispatcher: dispatcher, - } - t.taskHandle = taskScheduler.Submit(t, time.Now().Add(10*time.Millisecond)) - return t -} - -func (t *CheckProgressEmptyTask) Execute() time.Time { - if t.dispatcher.tableProgress.Empty() { - t.dispatcher.DealWithBlockEventWhenProgressEmpty() - return time.Time{} - } - return time.Now().Add(10 * time.Millisecond) -} - // Resend Task is reponsible for resending the TableSpanBlockStatus message with ddl info to maintainer each 50ms. // The task will be cancelled when the the dispatcher received the ack message from the maintainer type ResendTask struct { @@ -235,72 +164,138 @@ func (t *ResendTask) Cancel() { t.taskHandle.Cancel() } +var DispatcherTaskScheduler threadpool.ThreadPool +var dispatcherTaskSchedulerOnce sync.Once + +func GetDispatcherTaskScheduler() threadpool.ThreadPool { + if DispatcherTaskScheduler == nil { + dispatcherTaskSchedulerOnce.Do(func() { + DispatcherTaskScheduler = threadpool.NewThreadPoolDefault() + }) + } + return DispatcherTaskScheduler +} + +func SetDispatcherTaskScheduler(taskScheduler threadpool.ThreadPool) { + DispatcherTaskScheduler = taskScheduler +} + // DispatcherEventsHandler is used to dispatcher the events received. // If the event is a DML event, it will be added to the sink for writing to downstream. // If the event is a resolved TS event, it will be update the resolvedTs of the dispatcher. // If the event is a DDL event, -// 1. If it is a single table DDL, -// a. If the tableProgress is empty(previous events are flushed successfully),it will be added to the sink for writing to downstream(async). -// b. If the tableProgress is not empty, we will generate a CheckTableProgressEmptyTask to periodly check whether the tableProgress is empty, -// and then add the DDL event to the sink for writing to downstream(async). -// 2. If it is a multi-table DDL, -// a. If the tableProgress is empty(previous events are flushed successfully),We will generate a TableSpanBlockStatus message with ddl info to send to maintainer. -// b. If the tableProgress is not empty, we will generate a CheckTableProgressEmptyTask to periodly check whether the tableProgress is empty, -// and then we will generate a TableSpanBlockStatus message with ddl info to send to maintainer. -// for the multi-table DDL, we will also generate a ResendTask to resend the TableSpanBlockStatus message with ddl info to maintainer each 50ms to avoid message is missing. +// 1. If it is a single table DDL, it will be added to the sink for writing to downstream(async). +// 2. If it is a multi-table DDL, We will generate a TableSpanBlockStatus message with ddl info to send to maintainer. +// for the multi-table DDL, we will also generate a ResendTask to resend the TableSpanBlockStatus message with ddl info +// to maintainer each 200ms to avoid message is missing. // -// Considering for ddl event, we always do an async write, so we need to be blocked before the ddl event flushed to downstream successfully. -// Thus, we add a callback function to let the hander be waked when the ddl event flushed to downstream successfully. - +// If the event is a Sync Point event, we deal it as a multi-table DDL event. +// +// We can handle multi events in batch if there only dml events and resovledTs events. +// For DDL event and Sync Point Event, we should handle them singlely. +// Thus, if a event is DDL event or Sync Point Event, we will only get one event at once. +// Otherwise, we can get a batch events. +// We always return block = true for Handle() except we only receive the resolvedTs events. +// So we only will reach next Handle() when previous events are all push downstream successfully. type DispatcherEventsHandler struct { } -func (h *DispatcherEventsHandler) Path(event commonEvent.Event) common.DispatcherID { +func (h *DispatcherEventsHandler) Path(event DispatcherEvent) common.DispatcherID { return event.GetDispatcherID() } -// TODO: 这个后面需要按照更大的粒度进行攒批 -func (h *DispatcherEventsHandler) Handle(dispatcher *Dispatcher, event ...commonEvent.Event) bool { - if len(event) != 1 { - // TODO: Handle batch events - panic("only one event is allowed") - } - return dispatcher.HandleEvent(event[0]) +func (h *DispatcherEventsHandler) Handle(dispatcher *Dispatcher, events ...DispatcherEvent) bool { + return dispatcher.HandleEvents(events) } -var DispatcherTaskScheduler threadpool.ThreadPool -var dispatcherTaskSchedulerOnce sync.Once +type DispatcherEvent struct { + commonEvent.Event + isBatchable bool +} -func GetDispatcherTaskScheduler() threadpool.ThreadPool { - if DispatcherTaskScheduler == nil { - dispatcherTaskSchedulerOnce.Do(func() { - DispatcherTaskScheduler = threadpool.NewThreadPoolDefault() - }) - } - return DispatcherTaskScheduler +func (d DispatcherEvent) IsBatchable() bool { + return d.isBatchable } -func SetDispatcherTaskScheduler(taskScheduler threadpool.ThreadPool) { - DispatcherTaskScheduler = taskScheduler +func NewDispatcherEvent(event commonEvent.Event) *DispatcherEvent { + dispatcherEvent := &DispatcherEvent{ + Event: event, + } + switch event.GetType() { + case commonEvent.TypeResolvedEvent, commonEvent.TypeDMLEvent: + dispatcherEvent.isBatchable = true + case commonEvent.TypeDDLEvent, commonEvent.TypeSyncPointEvent: + dispatcherEvent.isBatchable = false + default: + log.Error("unknown event type", zap.Int("type", int(event.GetType()))) + } + return dispatcherEvent } -var dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, commonEvent.Event, *Dispatcher] +var dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, DispatcherEvent, *Dispatcher] var dispatcherEventsDynamicStreamOnce sync.Once -func GetDispatcherEventsDynamicStream() dynstream.DynamicStream[common.DispatcherID, commonEvent.Event, *Dispatcher] { +func GetDispatcherEventsDynamicStream() dynstream.DynamicStream[common.DispatcherID, DispatcherEvent, *Dispatcher] { if dispatcherEventsDynamicStream == nil { dispatcherEventsDynamicStreamOnce.Do(func() { - dispatcherEventsDynamicStream = dynstream.NewDynamicStream(&DispatcherEventsHandler{}) + dispatcherEventsDynamicStream = dynstream.NewDynamicStream(&DispatcherEventsHandler{}, dynstream.NewOptionWithBatchSize(128)) dispatcherEventsDynamicStream.Start() }) } return dispatcherEventsDynamicStream } -func SetDispatcherEventsDynamicStream(dynamicStream dynstream.DynamicStream[common.DispatcherID, commonEvent.Event, *Dispatcher]) { +func SetDispatcherEventsDynamicStream(dynamicStream dynstream.DynamicStream[common.DispatcherID, DispatcherEvent, *Dispatcher]) { dispatcherEventsDynamicStream = dynamicStream } +type DispatcherStatusWithID struct { + id common.DispatcherID + status *heartbeatpb.DispatcherStatus +} + +func NewDispatcherStatusWithID(dispatcherStatus *heartbeatpb.DispatcherStatus, dispatcherID common.DispatcherID) DispatcherStatusWithID { + return DispatcherStatusWithID{ + status: dispatcherStatus, + id: dispatcherID, + } +} + +func (d *DispatcherStatusWithID) GetDispatcherStatus() *heartbeatpb.DispatcherStatus { + return d.status +} + +func (d *DispatcherStatusWithID) GetDispatcherID() common.DispatcherID { + return d.id +} + +func (d DispatcherStatusWithID) IsBatchable() bool { + return true +} + +// DispatcherStatusHandler is used to handle the DispatcherStatus event. +// Each dispatcher status may contain a ACK info or a dispatcher action or both. +// If we get a ack info, we need to check whether the ack is for the current pending ddl event. +// If so, we can cancel the resend task. +// If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. +// If so, we can deal the ddl event based on the action. +// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream(async). +// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and +// wake the dispatcherEventsHandler to handle the event. +type DispatcherStatusHandler struct { +} + +func (h *DispatcherStatusHandler) Path(event DispatcherStatusWithID) common.DispatcherID { + return event.GetDispatcherID() +} + +func (h *DispatcherStatusHandler) Handle(dispatcher *Dispatcher, events ...DispatcherStatusWithID) (await bool) { + for _, event := range events { + dispatcher.HandleDispatcherStatus(event.GetDispatcherStatus()) + } + return false +} + var dispatcherStatusDynamicStream dynstream.DynamicStream[common.DispatcherID, DispatcherStatusWithID, *Dispatcher] var dispatcherStatusDynamicStreamOnce sync.Once diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 02355f917..53f4cd853 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -43,9 +43,9 @@ type HeartBeatCollector struct { heartBeatReqQueue *HeartbeatRequestQueue blockStatusReqQueue *BlockStatusRequestQueue - heartBeatResponseDynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.HeartBeatResponse, *EventDispatcherManager] - schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.ScheduleDispatcherRequest, *EventDispatcherManager] - checkpointTsMessageDynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.CheckpointTsMessage, *EventDispatcherManager] + heartBeatResponseDynamicStream dynstream.DynamicStream[model.ChangeFeedID, HeartBeatResponse, *EventDispatcherManager] + schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[model.ChangeFeedID, SchedulerDispatcherRequest, *EventDispatcherManager] + checkpointTsMessageDynamicStream dynstream.DynamicStream[model.ChangeFeedID, CheckpointTsMessage, *EventDispatcherManager] mc messaging.MessageCenter } @@ -127,15 +127,15 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ case messaging.TypeHeartBeatResponse: heartbeatResponse := msg.Message[0].(*heartbeatpb.HeartBeatResponse) heartBeatResponseDynamicStream := GetHeartBeatResponseDynamicStream() - heartBeatResponseDynamicStream.In() <- heartbeatResponse + heartBeatResponseDynamicStream.In() <- *NewHeartBeatResponse(heartbeatResponse) case messaging.TypeScheduleDispatcherRequest: - scheduleDispatcherRequest := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) - c.schedulerDispatcherRequestDynamicStream.In() <- scheduleDispatcherRequest + schedulerDispatcherRequest := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) + c.schedulerDispatcherRequestDynamicStream.In() <- *NewSchedulerDispatcherRequest(schedulerDispatcherRequest) // TODO: check metrics - metrics.HandleDispatcherRequsetCounter.WithLabelValues("default", scheduleDispatcherRequest.ChangefeedID, "receive").Inc() + metrics.HandleDispatcherRequsetCounter.WithLabelValues("default", schedulerDispatcherRequest.ChangefeedID, "receive").Inc() case messaging.TypeCheckpointTsMessage: checkpointTsMessage := msg.Message[0].(*heartbeatpb.CheckpointTsMessage) - c.checkpointTsMessageDynamicStream.In() <- checkpointTsMessage + c.checkpointTsMessageDynamicStream.In() <- *NewCheckpointTsMessage(checkpointTsMessage) default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } @@ -149,17 +149,17 @@ func (c *HeartBeatCollector) Close() { type SchedulerDispatcherRequestHandler struct { } -func (h *SchedulerDispatcherRequestHandler) Path(scheduleDispatcherRequest *heartbeatpb.ScheduleDispatcherRequest) model.ChangeFeedID { +func (h *SchedulerDispatcherRequestHandler) Path(scheduleDispatcherRequest SchedulerDispatcherRequest) model.ChangeFeedID { return model.DefaultChangeFeedID(scheduleDispatcherRequest.ChangefeedID) } -func (h *SchedulerDispatcherRequestHandler) Handle(eventDispatcherManager *EventDispatcherManager, reqs ...*heartbeatpb.ScheduleDispatcherRequest) bool { +func (h *SchedulerDispatcherRequestHandler) Handle(eventDispatcherManager *EventDispatcherManager, reqs ...SchedulerDispatcherRequest) bool { if len(reqs) != 1 { // TODO: Support batch panic("invalid request count") } scheduleDispatcherRequest := reqs[0] - if scheduleDispatcherRequest == nil { + if scheduleDispatcherRequest.ScheduleDispatcherRequest == nil { log.Warn("scheduleDispatcherRequest is nil, skip") return false } @@ -184,11 +184,11 @@ func NewHeartBeatResponseHandler() HeartBeatResponseHandler { return HeartBeatResponseHandler{dispatcherStatusDynamicStream: dispatcher.GetDispatcherStatusDynamicStream()} } -func (h *HeartBeatResponseHandler) Path(HeartbeatResponse *heartbeatpb.HeartBeatResponse) model.ChangeFeedID { +func (h *HeartBeatResponseHandler) Path(HeartbeatResponse HeartBeatResponse) model.ChangeFeedID { return model.DefaultChangeFeedID(HeartbeatResponse.ChangefeedID) } -func (h *HeartBeatResponseHandler) Handle(eventDispatcherManager *EventDispatcherManager, resps ...*heartbeatpb.HeartBeatResponse) bool { +func (h *HeartBeatResponseHandler) Handle(eventDispatcherManager *EventDispatcherManager, resps ...HeartBeatResponse) bool { if len(resps) != 1 { // TODO: Support batch panic("invalid response count") @@ -229,11 +229,11 @@ func NewCheckpointTsMessageHandler() CheckpointTsMessageHandler { return CheckpointTsMessageHandler{} } -func (h *CheckpointTsMessageHandler) Path(checkpointTsMessage *heartbeatpb.CheckpointTsMessage) model.ChangeFeedID { +func (h *CheckpointTsMessageHandler) Path(checkpointTsMessage CheckpointTsMessage) model.ChangeFeedID { return model.DefaultChangeFeedID(checkpointTsMessage.ChangefeedID) } -func (h *CheckpointTsMessageHandler) Handle(eventDispatcherManager *EventDispatcherManager, messages ...*heartbeatpb.CheckpointTsMessage) bool { +func (h *CheckpointTsMessageHandler) Handle(eventDispatcherManager *EventDispatcherManager, messages ...CheckpointTsMessage) bool { if len(messages) != 1 { // TODO: Support batch panic("invalid message count") diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index de7468af0..239c37877 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -70,10 +70,22 @@ func SetHeartBeatTaskScheduler(taskScheduler threadpool.ThreadPool) { heartBeatTaskScheduler = taskScheduler } -var schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.ScheduleDispatcherRequest, *EventDispatcherManager] +type SchedulerDispatcherRequest struct { + *heartbeatpb.ScheduleDispatcherRequest +} + +func (r SchedulerDispatcherRequest) IsBatchable() bool { + return true +} + +func NewSchedulerDispatcherRequest(req *heartbeatpb.ScheduleDispatcherRequest) *SchedulerDispatcherRequest { + return &SchedulerDispatcherRequest{req} +} + +var schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[model.ChangeFeedID, SchedulerDispatcherRequest, *EventDispatcherManager] var schedulerDispatcherRequestDynamicStreamOnce sync.Once -func GetSchedulerDispatcherRequestDynamicStream() dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.ScheduleDispatcherRequest, *EventDispatcherManager] { +func GetSchedulerDispatcherRequestDynamicStream() dynstream.DynamicStream[model.ChangeFeedID, SchedulerDispatcherRequest, *EventDispatcherManager] { if schedulerDispatcherRequestDynamicStream == nil { schedulerDispatcherRequestDynamicStreamOnce.Do(func() { schedulerDispatcherRequestDynamicStream = dynstream.NewDynamicStream(&SchedulerDispatcherRequestHandler{}) @@ -83,14 +95,26 @@ func GetSchedulerDispatcherRequestDynamicStream() dynstream.DynamicStream[model. return schedulerDispatcherRequestDynamicStream } -func SetSchedulerDispatcherRequestDynamicStream(dynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.ScheduleDispatcherRequest, *EventDispatcherManager]) { +func SetSchedulerDispatcherRequestDynamicStream(dynamicStream dynstream.DynamicStream[model.ChangeFeedID, SchedulerDispatcherRequest, *EventDispatcherManager]) { schedulerDispatcherRequestDynamicStream = dynamicStream } -var heartBeatResponseDynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.HeartBeatResponse, *EventDispatcherManager] +type HeartBeatResponse struct { + *heartbeatpb.HeartBeatResponse +} + +func (r HeartBeatResponse) IsBatchable() bool { + return true +} + +func NewHeartBeatResponse(resp *heartbeatpb.HeartBeatResponse) *HeartBeatResponse { + return &HeartBeatResponse{resp} +} + +var heartBeatResponseDynamicStream dynstream.DynamicStream[model.ChangeFeedID, HeartBeatResponse, *EventDispatcherManager] var heartBeatResponseDynamicStreamOnce sync.Once -func GetHeartBeatResponseDynamicStream() dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.HeartBeatResponse, *EventDispatcherManager] { +func GetHeartBeatResponseDynamicStream() dynstream.DynamicStream[model.ChangeFeedID, HeartBeatResponse, *EventDispatcherManager] { if heartBeatResponseDynamicStream == nil { heartBeatResponseDynamicStreamOnce.Do(func() { heartBeatResponseDynamicStream = dynstream.NewDynamicStream(&HeartBeatResponseHandler{dispatcher.GetDispatcherStatusDynamicStream()}) @@ -100,14 +124,26 @@ func GetHeartBeatResponseDynamicStream() dynstream.DynamicStream[model.ChangeFee return heartBeatResponseDynamicStream } -func SetHeartBeatResponseDynamicStream(dynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.HeartBeatResponse, *EventDispatcherManager]) { +func SetHeartBeatResponseDynamicStream(dynamicStream dynstream.DynamicStream[model.ChangeFeedID, HeartBeatResponse, *EventDispatcherManager]) { heartBeatResponseDynamicStream = dynamicStream } -var checkpointTsMessageDynamicStream dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.CheckpointTsMessage, *EventDispatcherManager] +type CheckpointTsMessage struct { + *heartbeatpb.CheckpointTsMessage +} + +func (r CheckpointTsMessage) IsBatchable() bool { + return true +} + +func NewCheckpointTsMessage(msg *heartbeatpb.CheckpointTsMessage) *CheckpointTsMessage { + return &CheckpointTsMessage{msg} +} + +var checkpointTsMessageDynamicStream dynstream.DynamicStream[model.ChangeFeedID, CheckpointTsMessage, *EventDispatcherManager] var checkpointTsMessageDynamicStreamOnce sync.Once -func GetCheckpointTsMessageDynamicStream() dynstream.DynamicStream[model.ChangeFeedID, *heartbeatpb.CheckpointTsMessage, *EventDispatcherManager] { +func GetCheckpointTsMessageDynamicStream() dynstream.DynamicStream[model.ChangeFeedID, CheckpointTsMessage, *EventDispatcherManager] { if checkpointTsMessageDynamicStream == nil { checkpointTsMessageDynamicStreamOnce.Do(func() { checkpointTsMessageDynamicStream = dynstream.NewDynamicStream(&CheckpointTsMessageHandler{}) diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index deef68b3e..003c4dbd7 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -75,7 +75,7 @@ type EventCollector struct { mc messaging.MessageCenter wg sync.WaitGroup - dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, commonEvent.Event, *dispatcher.Dispatcher] + dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, dispatcher.DispatcherEvent, *dispatcher.Dispatcher] registerMessageChan *chann.DrainableChann[RegisterInfo] // for temp metricDispatcherReceivedKVEventCount prometheus.Counter @@ -188,11 +188,11 @@ func (c *EventCollector) RecvEventsMessage(_ context.Context, msg *messaging.Tar case commonEvent.TypeBatchResolvedEvent: for _, e := range event.(*commonEvent.BatchResolvedEvent).Events { c.metricDispatcherReceivedResolvedTsEventCount.Inc() - c.dispatcherEventsDynamicStream.In() <- e + c.dispatcherEventsDynamicStream.In() <- *dispatcher.NewDispatcherEvent(e) } default: c.metricDispatcherReceivedKVEventCount.Inc() - c.dispatcherEventsDynamicStream.In() <- event + c.dispatcherEventsDynamicStream.In() <- *dispatcher.NewDispatcherEvent(event) } } return nil diff --git a/downstreamadapter/sink/types/table_progress.go b/downstreamadapter/sink/types/table_progress.go index 4cbda8194..357513a63 100644 --- a/downstreamadapter/sink/types/table_progress.go +++ b/downstreamadapter/sink/types/table_progress.go @@ -56,7 +56,7 @@ func (p *TableProgress) Add(event commonEvent.FlushableEvent) { elem := p.list.PushBack(ts) p.elemMap[ts] = elem p.maxCommitTs = event.GetCommitTs() - event.AddPostFlushFunc(func() { p.Remove(event) }) + event.PushFrontFlushFunc(func() { p.Remove(event) }) } // 而且删除可以认为是批量的?但要不要做成批量可以后面再看 diff --git a/maintainer/maintainer_event.go b/maintainer/maintainer_event.go index 06d4022fc..6e1034f24 100644 --- a/maintainer/maintainer_event.go +++ b/maintainer/maintainer_event.go @@ -40,6 +40,10 @@ type Event struct { dispatcherEvent *InternalScheduleDispatcherEvent } +func (e Event) IsBatchable() bool { + return true +} + // SubmitScheduledEvent submits a task to controller pool to send a future event func SubmitScheduledEvent( scheduler threadpool.ThreadPool, diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index ecb9144eb..493c64f94 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -73,6 +73,10 @@ func (d *DDLEvent) AddPostFlushFunc(f func()) { d.PostTxnFlushed = append(d.PostTxnFlushed, f) } +func (d *DDLEvent) PushFrontFlushFunc(f func()) { + d.PostTxnFlushed = append([]func(){f}, d.PostTxnFlushed...) +} + func (e *DDLEvent) GetBlockedTables() *InfluencedTables { return e.BlockedTables } diff --git a/pkg/common/event/dml_event.go b/pkg/common/event/dml_event.go index 3eb64d5f3..88a75cc0d 100644 --- a/pkg/common/event/dml_event.go +++ b/pkg/common/event/dml_event.go @@ -96,6 +96,10 @@ func (t *DMLEvent) PostFlush() { } } +func (t *DMLEvent) PushFrontFlushFunc(f func()) { + t.PostTxnFlushed = append([]func(){f}, t.PostTxnFlushed...) +} + func (t *DMLEvent) AddPostFlushFunc(f func()) { t.PostTxnFlushed = append(t.PostTxnFlushed, f) } diff --git a/pkg/common/event/interface.go b/pkg/common/event/interface.go index e3ef2ecf5..f9819c442 100644 --- a/pkg/common/event/interface.go +++ b/pkg/common/event/interface.go @@ -21,6 +21,7 @@ type FlushableEvent interface { Event PostFlush() AddPostFlushFunc(func()) + PushFrontFlushFunc(f func()) } // BlockEvent is an event that may be blocked the dispatcher. diff --git a/pkg/common/event/sync_point_event.go b/pkg/common/event/sync_point_event.go index f3712cbff..55725f20c 100644 --- a/pkg/common/event/sync_point_event.go +++ b/pkg/common/event/sync_point_event.go @@ -70,3 +70,7 @@ func (e *SyncPointEvent) PostFlush() { func (e *SyncPointEvent) AddPostFlushFunc(f func()) { e.PostTxnFlushed = append(e.PostTxnFlushed, f) } + +func (e *SyncPointEvent) PushFrontFlushFunc(f func()) { + e.PostTxnFlushed = append([]func(){f}, e.PostTxnFlushed...) +} diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index d8c97bb2c..fbd2f8eeb 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -776,6 +776,10 @@ func (t *scanTask) handle() { metricScanTaskQueueDuration.Observe(float64(time.Since(t.createTime).Milliseconds())) } +func (t scanTask) IsBatchable() bool { + return true +} + type scanTaskPool struct { // pendingTaskQueue is used to store the tasks that are waiting to be handled by the scan workers. // The length of the pendingTaskQueue is equal to the number of the scan workers. diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index d22f864ed..7fc484c90 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -10,7 +10,11 @@ import ( type Path comparable // An event belongs to a path. -type Event any +type Event interface { + // returns true if the event could be batched with other events, + // returns false which means the event should be called handled singly + IsBatchable() bool +} // A destination is the place where the event is sent to. type Dest any @@ -112,6 +116,12 @@ func NewOption() Option { } } +func NewOptionWithBatchSize(batchSize int) Option { + opt := NewOption() + opt.BatchSize = batchSize + return opt +} + func (o *Option) fix() { if o.StreamCount == 0 { o.StreamCount = runtime.NumCPU() diff --git a/utils/dynstream/stream.go b/utils/dynstream/stream.go index 1b435dc2e..6d73ed685 100644 --- a/utils/dynstream/stream.go +++ b/utils/dynstream/stream.go @@ -338,13 +338,25 @@ Loop: handleCount := min(signal.eventCount, s.option.BatchSize) for i := 0; i < handleCount; i++ { - e, ok := signal.pathInfo.pendingQueue.PopFront() + e, ok := signal.pathInfo.pendingQueue.Front() if !ok { // The signal could contain more events than the pendingQueue, // which is possible when the path is removed or recovered from blocked. break } - eventBuf = append(eventBuf, e) + + if e.IsBatchable() { + signal.pathInfo.pendingQueue.PopFront() + eventBuf = append(eventBuf, e) + } else { + // If the event is non-batchable, we should handle it singly. + // The non-batchable event should be the only event in a batch. + if i == 0 { + eventBuf = append(eventBuf, e) + signal.pathInfo.pendingQueue.PopFront() + } + break + } } actualCount := len(eventBuf)