Skip to content

Commit 03b5ecd

Browse files
committed
update
1 parent 24682c1 commit 03b5ecd

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

downstreamadapter/dispatcher/helper.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,11 @@ func NewDispatcherEvent(event commonEvent.Event) *DispatcherEvent {
223223
}
224224
switch event.GetType() {
225225
case commonEvent.TypeResolvedEvent:
226+
dispatcherEvent.isBatchable = true
226227
case commonEvent.TypeDMLEvent:
227228
dispatcherEvent.isBatchable = true
228229
case commonEvent.TypeDDLEvent:
230+
dispatcherEvent.isBatchable = false
229231
case commonEvent.TypeSyncPointEvent:
230232
dispatcherEvent.isBatchable = false
231233
default:
@@ -240,7 +242,7 @@ var dispatcherEventsDynamicStreamOnce sync.Once
240242
func GetDispatcherEventsDynamicStream() dynstream.DynamicStream[common.DispatcherID, DispatcherEvent, *Dispatcher] {
241243
if dispatcherEventsDynamicStream == nil {
242244
dispatcherEventsDynamicStreamOnce.Do(func() {
243-
dispatcherEventsDynamicStream = dynstream.NewDynamicStream(&DispatcherEventsHandler{})
245+
dispatcherEventsDynamicStream = dynstream.NewDynamicStream(&DispatcherEventsHandler{}, dynstream.NewOptionWithBatchSize(128))
244246
dispatcherEventsDynamicStream.Start()
245247
})
246248
}

utils/dynstream/interfaces.go

+6
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@ func NewOption() Option {
116116
}
117117
}
118118

119+
func NewOptionWithBatchSize(batchSize int) Option {
120+
opt := NewOption()
121+
opt.BatchSize = batchSize
122+
return opt
123+
}
124+
119125
func (o *Option) fix() {
120126
if o.StreamCount == 0 {
121127
o.StreamCount = runtime.NumCPU()

utils/dynstream/stream.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -344,19 +344,19 @@ Loop:
344344
// which is possible when the path is removed or recovered from blocked.
345345
break
346346
}
347-
// If the event is non-batchable, we should handle it singly.
348-
// The non-batchable event should be the only event in a batch.
349-
if !e.IsBatchable() {
347+
348+
if e.IsBatchable() {
349+
signal.pathInfo.pendingQueue.PopFront()
350+
eventBuf = append(eventBuf, e)
351+
} else {
352+
// If the event is non-batchable, we should handle it singly.
353+
// The non-batchable event should be the only event in a batch.
350354
if i == 0 {
351355
eventBuf = append(eventBuf, e)
352356
signal.pathInfo.pendingQueue.PopFront()
353-
break
354-
} else {
355-
break
356357
}
358+
break
357359
}
358-
signal.pathInfo.pendingQueue.PopFront()
359-
eventBuf = append(eventBuf, e)
360360
}
361361

362362
actualCount := len(eventBuf)

0 commit comments

Comments
 (0)