Skip to content

Commit aa7fb72

Browse files
authored
use struct instread of point for dynamic stream's events (#374)
1 parent cba49f8 commit aa7fb72

File tree

4 files changed

+13
-13
lines changed

4 files changed

+13
-13
lines changed

downstreamadapter/dispatcher/helper.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ func (d DispatcherEvent) IsBatchable() bool {
217217
return d.isBatchable
218218
}
219219

220-
func NewDispatcherEvent(event commonEvent.Event) *DispatcherEvent {
221-
dispatcherEvent := &DispatcherEvent{
220+
func NewDispatcherEvent(event commonEvent.Event) DispatcherEvent {
221+
dispatcherEvent := DispatcherEvent{
222222
Event: event,
223223
}
224224
switch event.GetType() {

downstreamadapter/dispatchermanager/heartbeat_collector.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
127127
case messaging.TypeHeartBeatResponse:
128128
heartbeatResponse := msg.Message[0].(*heartbeatpb.HeartBeatResponse)
129129
heartBeatResponseDynamicStream := GetHeartBeatResponseDynamicStream()
130-
heartBeatResponseDynamicStream.In() <- *NewHeartBeatResponse(heartbeatResponse)
130+
heartBeatResponseDynamicStream.In() <- NewHeartBeatResponse(heartbeatResponse)
131131
case messaging.TypeScheduleDispatcherRequest:
132132
schedulerDispatcherRequest := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
133-
c.schedulerDispatcherRequestDynamicStream.In() <- *NewSchedulerDispatcherRequest(schedulerDispatcherRequest)
133+
c.schedulerDispatcherRequestDynamicStream.In() <- NewSchedulerDispatcherRequest(schedulerDispatcherRequest)
134134
// TODO: check metrics
135135
metrics.HandleDispatcherRequsetCounter.WithLabelValues("default", schedulerDispatcherRequest.ChangefeedID, "receive").Inc()
136136
case messaging.TypeCheckpointTsMessage:
137137
checkpointTsMessage := msg.Message[0].(*heartbeatpb.CheckpointTsMessage)
138-
c.checkpointTsMessageDynamicStream.In() <- *NewCheckpointTsMessage(checkpointTsMessage)
138+
c.checkpointTsMessageDynamicStream.In() <- NewCheckpointTsMessage(checkpointTsMessage)
139139
default:
140140
log.Panic("unknown message type", zap.Any("message", msg.Message))
141141
}

downstreamadapter/dispatchermanager/helper.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ func (r SchedulerDispatcherRequest) IsBatchable() bool {
7878
return true
7979
}
8080

81-
func NewSchedulerDispatcherRequest(req *heartbeatpb.ScheduleDispatcherRequest) *SchedulerDispatcherRequest {
82-
return &SchedulerDispatcherRequest{req}
81+
func NewSchedulerDispatcherRequest(req *heartbeatpb.ScheduleDispatcherRequest) SchedulerDispatcherRequest {
82+
return SchedulerDispatcherRequest{req}
8383
}
8484

8585
var schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[model.ChangeFeedID, SchedulerDispatcherRequest, *EventDispatcherManager]
@@ -107,8 +107,8 @@ func (r HeartBeatResponse) IsBatchable() bool {
107107
return true
108108
}
109109

110-
func NewHeartBeatResponse(resp *heartbeatpb.HeartBeatResponse) *HeartBeatResponse {
111-
return &HeartBeatResponse{resp}
110+
func NewHeartBeatResponse(resp *heartbeatpb.HeartBeatResponse) HeartBeatResponse {
111+
return HeartBeatResponse{resp}
112112
}
113113

114114
var heartBeatResponseDynamicStream dynstream.DynamicStream[model.ChangeFeedID, HeartBeatResponse, *EventDispatcherManager]
@@ -136,8 +136,8 @@ func (r CheckpointTsMessage) IsBatchable() bool {
136136
return true
137137
}
138138

139-
func NewCheckpointTsMessage(msg *heartbeatpb.CheckpointTsMessage) *CheckpointTsMessage {
140-
return &CheckpointTsMessage{msg}
139+
func NewCheckpointTsMessage(msg *heartbeatpb.CheckpointTsMessage) CheckpointTsMessage {
140+
return CheckpointTsMessage{msg}
141141
}
142142

143143
var checkpointTsMessageDynamicStream dynstream.DynamicStream[model.ChangeFeedID, CheckpointTsMessage, *EventDispatcherManager]

downstreamadapter/eventcollector/event_collector.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,11 @@ func (c *EventCollector) RecvEventsMessage(_ context.Context, msg *messaging.Tar
188188
case commonEvent.TypeBatchResolvedEvent:
189189
for _, e := range event.(*commonEvent.BatchResolvedEvent).Events {
190190
c.metricDispatcherReceivedResolvedTsEventCount.Inc()
191-
c.dispatcherEventsDynamicStream.In() <- *dispatcher.NewDispatcherEvent(e)
191+
c.dispatcherEventsDynamicStream.In() <- dispatcher.NewDispatcherEvent(e)
192192
}
193193
default:
194194
c.metricDispatcherReceivedKVEventCount.Inc()
195-
c.dispatcherEventsDynamicStream.In() <- *dispatcher.NewDispatcherEvent(event)
195+
c.dispatcherEventsDynamicStream.In() <- dispatcher.NewDispatcherEvent(event)
196196
}
197197
}
198198
return nil

0 commit comments

Comments
 (0)