diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 0c5a76124..181d37962 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -217,8 +217,8 @@ func (d DispatcherEvent) IsBatchable() bool { return d.isBatchable } -func NewDispatcherEvent(event commonEvent.Event) *DispatcherEvent { - dispatcherEvent := &DispatcherEvent{ +func NewDispatcherEvent(event commonEvent.Event) DispatcherEvent { + dispatcherEvent := DispatcherEvent{ Event: event, } switch event.GetType() { diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 53f4cd853..387adb74a 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -127,15 +127,15 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ case messaging.TypeHeartBeatResponse: heartbeatResponse := msg.Message[0].(*heartbeatpb.HeartBeatResponse) heartBeatResponseDynamicStream := GetHeartBeatResponseDynamicStream() - heartBeatResponseDynamicStream.In() <- *NewHeartBeatResponse(heartbeatResponse) + heartBeatResponseDynamicStream.In() <- NewHeartBeatResponse(heartbeatResponse) case messaging.TypeScheduleDispatcherRequest: schedulerDispatcherRequest := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) - c.schedulerDispatcherRequestDynamicStream.In() <- *NewSchedulerDispatcherRequest(schedulerDispatcherRequest) + c.schedulerDispatcherRequestDynamicStream.In() <- NewSchedulerDispatcherRequest(schedulerDispatcherRequest) // TODO: check metrics metrics.HandleDispatcherRequsetCounter.WithLabelValues("default", schedulerDispatcherRequest.ChangefeedID, "receive").Inc() case messaging.TypeCheckpointTsMessage: checkpointTsMessage := msg.Message[0].(*heartbeatpb.CheckpointTsMessage) - c.checkpointTsMessageDynamicStream.In() <- *NewCheckpointTsMessage(checkpointTsMessage) + c.checkpointTsMessageDynamicStream.In() <- NewCheckpointTsMessage(checkpointTsMessage) default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index 239c37877..409304958 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -78,8 +78,8 @@ func (r SchedulerDispatcherRequest) IsBatchable() bool { return true } -func NewSchedulerDispatcherRequest(req *heartbeatpb.ScheduleDispatcherRequest) *SchedulerDispatcherRequest { - return &SchedulerDispatcherRequest{req} +func NewSchedulerDispatcherRequest(req *heartbeatpb.ScheduleDispatcherRequest) SchedulerDispatcherRequest { + return SchedulerDispatcherRequest{req} } var schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[model.ChangeFeedID, SchedulerDispatcherRequest, *EventDispatcherManager] @@ -107,8 +107,8 @@ func (r HeartBeatResponse) IsBatchable() bool { return true } -func NewHeartBeatResponse(resp *heartbeatpb.HeartBeatResponse) *HeartBeatResponse { - return &HeartBeatResponse{resp} +func NewHeartBeatResponse(resp *heartbeatpb.HeartBeatResponse) HeartBeatResponse { + return HeartBeatResponse{resp} } var heartBeatResponseDynamicStream dynstream.DynamicStream[model.ChangeFeedID, HeartBeatResponse, *EventDispatcherManager] @@ -136,8 +136,8 @@ func (r CheckpointTsMessage) IsBatchable() bool { return true } -func NewCheckpointTsMessage(msg *heartbeatpb.CheckpointTsMessage) *CheckpointTsMessage { - return &CheckpointTsMessage{msg} +func NewCheckpointTsMessage(msg *heartbeatpb.CheckpointTsMessage) CheckpointTsMessage { + return CheckpointTsMessage{msg} } var checkpointTsMessageDynamicStream dynstream.DynamicStream[model.ChangeFeedID, CheckpointTsMessage, *EventDispatcherManager] diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 003c4dbd7..219d991f3 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -188,11 +188,11 @@ func (c *EventCollector) RecvEventsMessage(_ context.Context, msg *messaging.Tar case commonEvent.TypeBatchResolvedEvent: for _, e := range event.(*commonEvent.BatchResolvedEvent).Events { c.metricDispatcherReceivedResolvedTsEventCount.Inc() - c.dispatcherEventsDynamicStream.In() <- *dispatcher.NewDispatcherEvent(e) + c.dispatcherEventsDynamicStream.In() <- dispatcher.NewDispatcherEvent(e) } default: c.metricDispatcherReceivedKVEventCount.Inc() - c.dispatcherEventsDynamicStream.In() <- *dispatcher.NewDispatcherEvent(event) + c.dispatcherEventsDynamicStream.In() <- dispatcher.NewDispatcherEvent(event) } } return nil