Skip to content

Commit 722903e

Browse files
authored
logservice: fix dynamic stream handler and optimize eventstore cpu usage (#365)
* avoid heap allocation * avoid scan ddl in handle * some rename * hack * fix * notify max event commit ts as original way * small fix * Revert "avoid heap allocation" This reverts commit 0ea7907. * try optimize * increase batch size * increase ch size * only send resolved ts whn advance * small improvement * small fix * change event processor num * small fix * try less processor * increase processor num * use original processor num * try fix dead lock
1 parent f5162aa commit 722903e

12 files changed

+343
-209
lines changed

logservice/eventstore/event_store.go

+228-103
Large diffs are not rendered by default.

logservice/logpuller/log_puller.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type spanProgress struct {
4747
resolvedTsUpdated atomic.Int64
4848
resolvedTs atomic.Uint64
4949

50+
// tag is supplied at subscription time and is passed to the consume function.
51+
tag interface{}
52+
5053
consume struct {
5154
// This lock is used to prevent the table progress from being
5255
// removed while consuming events.
@@ -74,7 +77,7 @@ func (p *spanProgress) resolveLock(currentTime time.Time) {
7477
type LogPuller struct {
7578
client *SubscriptionClient
7679
pdClock pdutil.Clock
77-
consume func(context.Context, *common.RawKVEntry, SubscriptionID) error
80+
consume func(context.Context, *common.RawKVEntry, SubscriptionID, interface{}) error
7881

7982
subscriptions struct {
8083
sync.RWMutex
@@ -89,7 +92,7 @@ type LogPuller struct {
8992
func NewLogPuller(
9093
client *SubscriptionClient,
9194
pdClock pdutil.Clock,
92-
consume func(context.Context, *common.RawKVEntry, SubscriptionID) error,
95+
consume func(context.Context, *common.RawKVEntry, SubscriptionID, interface{}) error,
9396
) *LogPuller {
9497
puller := &LogPuller{
9598
client: client,
@@ -155,6 +158,7 @@ func (p *LogPuller) Close(ctx context.Context) error {
155158
func (p *LogPuller) Subscribe(
156159
span heartbeatpb.TableSpan,
157160
startTs uint64,
161+
tag interface{},
158162
) SubscriptionID {
159163
p.subscriptions.Lock()
160164

@@ -163,6 +167,7 @@ func (p *LogPuller) Subscribe(
163167
progress := &spanProgress{
164168
span: span,
165169
subID: subID,
170+
tag: tag,
166171
}
167172

168173
progress.consume.f = func(
@@ -173,7 +178,7 @@ func (p *LogPuller) Subscribe(
173178
progress.consume.RLock()
174179
defer progress.consume.RUnlock()
175180
if !progress.consume.removed {
176-
return p.consume(ctx, raw, subID)
181+
return p.consume(ctx, raw, subID, progress.tag)
177182
}
178183
return nil
179184
}

logservice/logpuller/log_puller_multi_span.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,7 @@ func NewLogPullerMultiSpan(
7272
}
7373

7474
// consumeWrapper may be called concurrently
75-
consumeWrapper := func(ctx context.Context, entry *common.RawKVEntry, subID SubscriptionID) error {
76-
if entry == nil {
77-
return nil
78-
}
75+
consumeWrapper := func(ctx context.Context, entry *common.RawKVEntry, subID SubscriptionID, _ interface{}) error {
7976
if entry.IsResolved() {
8077
pullerWrapper.tryUpdatePendingResolvedTs(subID, entry.CRTs)
8178
return nil
@@ -85,7 +82,7 @@ func NewLogPullerMultiSpan(
8582

8683
pullerWrapper.innerPuller = NewLogPuller(client, pdClock, consumeWrapper)
8784
for _, span := range spans {
88-
subID := pullerWrapper.innerPuller.Subscribe(span, startTs)
85+
subID := pullerWrapper.innerPuller.Subscribe(span, startTs, nil)
8986
item := &resolvedTsItem{
9087
resolvedTs: 0,
9188
}

logservice/logpuller/log_puller_multi_span_test.go

+12-20
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,16 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) {
9292
subID1 := subIDs[1]
9393

9494
puller.innerPuller.client.consume(ctx, LogEvent{
95-
regionFeedEvent: regionFeedEvent{
96-
Val: &common.RawKVEntry{
97-
OpType: common.OpTypeResolved,
98-
CRTs: uint64(1000),
99-
},
95+
Val: &common.RawKVEntry{
96+
OpType: common.OpTypeResolved,
97+
CRTs: uint64(1000),
10098
},
10199
SubscriptionID: subID0,
102100
})
103101
puller.innerPuller.client.consume(ctx, LogEvent{
104-
regionFeedEvent: regionFeedEvent{
105-
Val: &common.RawKVEntry{
106-
OpType: common.OpTypeResolved,
107-
CRTs: uint64(1005),
108-
},
102+
Val: &common.RawKVEntry{
103+
OpType: common.OpTypeResolved,
104+
CRTs: uint64(1005),
109105
},
110106
SubscriptionID: subID1,
111107
})
@@ -119,11 +115,9 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) {
119115
}
120116

121117
puller.innerPuller.client.consume(ctx, LogEvent{
122-
regionFeedEvent: regionFeedEvent{
123-
Val: &common.RawKVEntry{
124-
OpType: common.OpTypeResolved,
125-
CRTs: uint64(1002),
126-
},
118+
Val: &common.RawKVEntry{
119+
OpType: common.OpTypeResolved,
120+
CRTs: uint64(1002),
127121
},
128122
SubscriptionID: subID0,
129123
})
@@ -137,11 +131,9 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) {
137131
}
138132

139133
puller.innerPuller.client.consume(ctx, LogEvent{
140-
regionFeedEvent: regionFeedEvent{
141-
Val: &common.RawKVEntry{
142-
OpType: common.OpTypeResolved,
143-
CRTs: uint64(1008),
144-
},
134+
Val: &common.RawKVEntry{
135+
OpType: common.OpTypeResolved,
136+
CRTs: uint64(1008),
145137
},
146138
SubscriptionID: subID0,
147139
})

logservice/logpuller/region_change_event_processor.go

+21-30
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,9 @@ func (w *changeEventProcessor) processEvent(ctx context.Context, event statefulE
159159
// NOTE: context.Canceled won't be treated as an error.
160160
func (w *changeEventProcessor) handleEventEntry(ctx context.Context, x *cdcpb.Event_Entries_, state *regionFeedState) error {
161161
startTs := state.region.subscribedSpan.startTs
162-
emit := func(assembled regionFeedEvent) error {
162+
emit := func(val *common.RawKVEntry) error {
163163
// TODO: add a metric to indicate whether the event is sent successfully.
164-
e := newLogEvent(assembled, state.region.subscribedSpan)
164+
e := newLogEvent(val, state.region.subscribedSpan)
165165
return w.client.consume(ctx, e)
166166
}
167167
tableID := state.region.subscribedSpan.span.TableID
@@ -177,7 +177,7 @@ func (w *changeEventProcessor) doHandle(
177177
x *cdcpb.Event_Entries_,
178178
startTs uint64,
179179
state *regionFeedState,
180-
emit func(assembled regionFeedEvent) error,
180+
emit func(val *common.RawKVEntry) error,
181181
tableID common.TableID,
182182
) error {
183183
regionID, _, _ := state.getRegionMeta()
@@ -269,30 +269,25 @@ func (w *changeEventProcessor) doHandle(
269269
return nil
270270
}
271271

272-
func (w *changeEventProcessor) assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (regionFeedEvent, error) {
272+
func (w *changeEventProcessor) assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (*common.RawKVEntry, error) {
273273
var opType common.OpType
274274
switch entry.GetOpType() {
275275
case cdcpb.Event_Row_DELETE:
276276
opType = common.OpTypeDelete
277277
case cdcpb.Event_Row_PUT:
278278
opType = common.OpTypePut
279279
default:
280-
return regionFeedEvent{}, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
280+
return &common.RawKVEntry{}, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
281281
}
282-
283-
return regionFeedEvent{
282+
return &common.RawKVEntry{
283+
OpType: opType,
284+
Key: entry.Key,
285+
Value: entry.GetValue(),
286+
StartTs: entry.StartTs,
287+
CRTs: entry.CommitTs,
284288
RegionID: regionID,
285-
Val: &common.RawKVEntry{
286-
OpType: opType,
287-
Key: entry.Key,
288-
Value: entry.GetValue(),
289-
StartTs: entry.StartTs,
290-
CRTs: entry.CommitTs,
291-
RegionID: regionID,
292-
OldValue: entry.GetOldValue(),
293-
},
289+
OldValue: entry.GetOldValue(),
294290
}, nil
295-
296291
}
297292

298293
func (w *changeEventProcessor) handleResolvedTs(ctx context.Context, batch resolvedTsBatch) {
@@ -324,20 +319,16 @@ func (w *changeEventProcessor) advanceTableSpan(ctx context.Context, batch resol
324319
state.updateResolvedTs(batch.ts)
325320
}
326321

327-
table := batch.regions[0].region.subscribedSpan
322+
span := batch.regions[0].region.subscribedSpan
328323
now := time.Now().UnixMilli()
329-
lastAdvance := table.lastAdvanceTime.Load()
330-
if now-lastAdvance > int64(w.client.config.AdvanceResolvedTsIntervalInMs) && table.lastAdvanceTime.CompareAndSwap(lastAdvance, now) {
331-
ts := table.rangeLock.ResolvedTs()
332-
// TODO: only send ts when ts is larger than previous ts
333-
if ts > table.startTs {
334-
revent := regionFeedEvent{
335-
Val: &common.RawKVEntry{
336-
OpType: common.OpTypeResolved,
337-
CRTs: ts,
338-
},
339-
}
340-
e := newLogEvent(revent, table)
324+
lastAdvance := span.lastAdvanceTime.Load()
325+
if now-lastAdvance > int64(w.client.config.AdvanceResolvedTsIntervalInMs) && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) {
326+
ts := span.rangeLock.ResolvedTs()
327+
if ts > span.startTs {
328+
e := newLogEvent(&common.RawKVEntry{
329+
OpType: common.OpTypeResolved,
330+
CRTs: ts,
331+
}, span)
341332
if err := w.client.consume(ctx, e); err != nil {
342333
return
343334
}

logservice/logpuller/subscription_client.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,14 @@ type regionFeedEvent struct {
8484

8585
// LogEvent wrap a region event with subscriptionID to indicate which subscription it belongs to.
8686
type LogEvent struct {
87-
regionFeedEvent
87+
Val *common.RawKVEntry
8888
SubscriptionID
8989
}
9090

91-
func newLogEvent(e regionFeedEvent, span *subscribedSpan) LogEvent {
91+
func newLogEvent(val *common.RawKVEntry, span *subscribedSpan) LogEvent {
9292
return LogEvent{
93-
regionFeedEvent: e,
94-
SubscriptionID: span.subID,
93+
Val: val,
94+
SubscriptionID: span.subID,
9595
}
9696
}
9797

@@ -868,12 +868,12 @@ func (e *errCache) dispatch(ctx context.Context) error {
868868
ticker := time.NewTicker(10 * time.Millisecond)
869869
sendToErrCh := func() {
870870
e.Lock()
871-
defer e.Unlock()
872871
if len(e.cache) == 0 {
873872
return
874873
}
875874
errInfo := e.cache[0]
876875
e.cache = e.cache[1:]
876+
e.Unlock()
877877
e.errCh <- errInfo
878878
}
879879
for {

logservice/logpuller/subscription_client_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,8 @@ func TestSubscriptionWithFailedTiKV(t *testing.T) {
236236
// it should auto switch to other stores and fetch events finally.
237237
select {
238238
case event := <-consumeCh:
239-
require.Equal(t, common.OpTypeResolved, event.regionFeedEvent.Val.OpType)
240-
require.Equal(t, ts, event.regionFeedEvent.Val.CRTs)
239+
require.Equal(t, common.OpTypeResolved, event.Val.OpType)
240+
require.Equal(t, ts, event.Val.CRTs)
241241
case <-time.After(5 * time.Second):
242242
require.True(t, false, "reconnection not succeed in 5 second")
243243
}
@@ -252,8 +252,8 @@ func TestSubscriptionWithFailedTiKV(t *testing.T) {
252252
// it should auto switch to other stores and fetch events finally.
253253
select {
254254
case event := <-consumeCh:
255-
require.Equal(t, common.OpTypeResolved, event.regionFeedEvent.Val.OpType)
256-
require.Equal(t, ts, event.regionFeedEvent.Val.CRTs)
255+
require.Equal(t, common.OpTypeResolved, event.Val.OpType)
256+
require.Equal(t, ts, event.Val.CRTs)
257257
case <-time.After(5 * time.Second):
258258
require.True(t, false, "reconnection not succeed in 5 second")
259259
}

logservice/schemastore/ddl_job_fetcher.go

-4
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ func (p *ddlJobFetcher) close(ctx context.Context) error {
9191
}
9292

9393
func (p *ddlJobFetcher) input(ctx context.Context, rawEvent *common.RawKVEntry) error {
94-
if rawEvent == nil {
95-
return nil
96-
}
97-
9894
if rawEvent.IsResolved() {
9995
p.advanceResolvedTs(uint64(rawEvent.CRTs))
10096
return nil

logservice/schemastore/persist_storage.go

+16
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,22 @@ func (p *persistentStorage) getTableInfo(tableID int64, ts uint64) (*common.Tabl
292292
return store.getTableInfo(ts)
293293
}
294294

295+
// TODO: this may consider some shouldn't be send ddl, like create table, does it matter?
296+
func (p *persistentStorage) getMaxEventCommitTs(tableID int64, ts uint64) uint64 {
297+
p.mu.RLock()
298+
defer p.mu.RUnlock()
299+
if len(p.tablesDDLHistory[tableID]) == 0 {
300+
return 0
301+
}
302+
index := sort.Search(len(p.tablesDDLHistory[tableID]), func(i int) bool {
303+
return p.tablesDDLHistory[tableID][i] > ts
304+
})
305+
if index == 0 {
306+
return 0
307+
}
308+
return p.tablesDDLHistory[tableID][index-1]
309+
}
310+
295311
// TODO: not all ddl in p.tablesDDLHistory should be sent to the dispatcher, verify dispatcher will set the right range
296312
func (p *persistentStorage) fetchTableDDLEvents(tableID int64, tableFilter filter.Filter, start, end uint64) ([]commonEvent.DDLEvent, error) {
297313
// TODO: check a dispatcher from created table start ts > finish ts of create table

logservice/schemastore/schema_store.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,22 @@ type SchemaStore interface {
3232
// return table info with largest version <= ts
3333
GetTableInfo(tableID int64, ts uint64) (*common.TableInfo, error)
3434

35+
// TODO: how to respect tableFilter
36+
GetTableDDLEventState(tableID int64) DDLEventState
37+
3538
// FetchTableDDLEvents returns the next ddl events which finishedTs are within the range (start, end]
36-
// and it returns a timestamp which means there will be no more ddl events before(<=) it
39+
// The caller must ensure end <= current resolvedTs
3740
// TODO: add a parameter limit
38-
FetchTableDDLEvents(tableID int64, tableFilter filter.Filter, start, end uint64) ([]commonEvent.DDLEvent, uint64, error)
41+
FetchTableDDLEvents(tableID int64, tableFilter filter.Filter, start, end uint64) ([]commonEvent.DDLEvent, error)
3942

4043
FetchTableTriggerDDLEvents(tableFilter filter.Filter, start uint64, limit int) ([]commonEvent.DDLEvent, uint64, error)
4144
}
4245

46+
type DDLEventState struct {
47+
ResolvedTs uint64
48+
MaxEventCommitTs uint64
49+
}
50+
4351
type schemaStore struct {
4452
pdClock pdutil.Clock
4553

@@ -225,19 +233,27 @@ func (s *schemaStore) GetTableInfo(tableID int64, ts uint64) (*common.TableInfo,
225233
return s.dataStorage.getTableInfo(tableID, ts)
226234
}
227235

228-
func (s *schemaStore) FetchTableDDLEvents(tableID int64, tableFilter filter.Filter, start, end uint64) ([]commonEvent.DDLEvent, uint64, error) {
229-
currentResolvedTs := s.resolvedTs.Load()
230-
if currentResolvedTs <= start {
231-
return nil, currentResolvedTs, nil
236+
func (s *schemaStore) GetTableDDLEventState(tableID int64) DDLEventState {
237+
resolvedTs := s.resolvedTs.Load()
238+
maxEventCommitTs := s.dataStorage.getMaxEventCommitTs(tableID, resolvedTs)
239+
return DDLEventState{
240+
ResolvedTs: resolvedTs,
241+
MaxEventCommitTs: maxEventCommitTs,
232242
}
233-
if currentResolvedTs < end {
234-
end = currentResolvedTs
243+
}
244+
245+
func (s *schemaStore) FetchTableDDLEvents(tableID int64, tableFilter filter.Filter, start, end uint64) ([]commonEvent.DDLEvent, error) {
246+
currentResolvedTs := s.resolvedTs.Load()
247+
if end > currentResolvedTs {
248+
log.Panic("end should not be greater than current resolved ts",
249+
zap.Uint64("end", end),
250+
zap.Uint64("currentResolvedTs", currentResolvedTs))
235251
}
236252
events, err := s.dataStorage.fetchTableDDLEvents(tableID, tableFilter, start, end)
237253
if err != nil {
238-
return nil, 0, err
254+
return nil, err
239255
}
240-
return events, end, nil
256+
return events, nil
241257
}
242258

243259
// FetchTableTriggerDDLEvents returns the next ddl events which finishedTs are within the range (start, end]

0 commit comments

Comments
 (0)