@@ -15,11 +15,13 @@ package dispatcher
15
15
16
16
import (
17
17
"sync/atomic"
18
+ "time"
18
19
19
20
tisink "github.com/flowbehappy/tigate/downstreamadapter/sink"
20
21
"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
21
22
"github.com/flowbehappy/tigate/heartbeatpb"
22
23
"github.com/flowbehappy/tigate/pkg/common"
24
+ commonEvent "github.com/flowbehappy/tigate/pkg/common/event"
23
25
"github.com/flowbehappy/tigate/pkg/filter"
24
26
"github.com/pingcap/log"
25
27
"github.com/pingcap/tidb/pkg/parser/model"
@@ -66,18 +68,16 @@ type Dispatcher struct {
66
68
67
69
statusesChan chan * heartbeatpb.TableSpanStatus
68
70
69
- //SyncPointInfo *SyncPointInfo
70
-
71
- //MemoryUsage *MemoryUsage
71
+ SyncPointInfo * SyncPointInfo
72
72
73
73
componentStatus * ComponentStateWithMutex
74
74
75
75
filter filter.Filter
76
76
77
77
resolvedTs * TsWithMutex // 用来记 中目前收到的 event 中收到的最大的 commitTs - 1,不代表 dispatcher 的 checkpointTs
78
78
79
- ddlPendingEvent * common. DDLEvent
80
- isRemoving atomic.Bool
79
+ blockPendingEvent commonEvent. BlockEvent
80
+ isRemoving atomic.Bool
81
81
82
82
tableProgress * types.TableProgress
83
83
@@ -91,19 +91,27 @@ type Dispatcher struct {
91
91
tableNameStore * TableNameStore
92
92
}
93
93
94
- func NewDispatcher (id common.DispatcherID , tableSpan * heartbeatpb.TableSpan , sink tisink.Sink , startTs uint64 , statusesChan chan * heartbeatpb.TableSpanStatus , filter filter.Filter , schemaID int64 , schemaIDToDispatchers * SchemaIDToDispatchers ) * Dispatcher {
94
+ func NewDispatcher (
95
+ id common.DispatcherID ,
96
+ tableSpan * heartbeatpb.TableSpan ,
97
+ sink tisink.Sink ,
98
+ startTs uint64 ,
99
+ statusesChan chan * heartbeatpb.TableSpanStatus ,
100
+ filter filter.Filter ,
101
+ schemaID int64 ,
102
+ schemaIDToDispatchers * SchemaIDToDispatchers ,
103
+ syncPointInfo * SyncPointInfo ) * Dispatcher {
95
104
dispatcher := & Dispatcher {
96
- id : id ,
97
- tableSpan : tableSpan ,
98
- sink : sink ,
99
- statusesChan : statusesChan ,
100
- //SyncPointInfo: syncPointInfo,
101
- //MemoryUsage: NewMemoryUsage(),
105
+ id : id ,
106
+ tableSpan : tableSpan ,
107
+ sink : sink ,
108
+ statusesChan : statusesChan ,
109
+ SyncPointInfo : syncPointInfo ,
102
110
componentStatus : newComponentStateWithMutex (heartbeatpb .ComponentState_Working ),
103
111
resolvedTs : newTsWithMutex (startTs ),
104
112
filter : filter ,
105
113
isRemoving : atomic.Bool {},
106
- ddlPendingEvent : nil ,
114
+ blockPendingEvent : nil ,
107
115
tableProgress : types .NewTableProgress (),
108
116
schemaID : schemaID ,
109
117
schemaIDToDispatchers : schemaIDToDispatchers ,
@@ -136,21 +144,26 @@ func NewDispatcher(id common.DispatcherID, tableSpan *heartbeatpb.TableSpan, sin
136
144
// 2.2 maintainer 通知自己可以 write 或者 pass event
137
145
//
138
146
// TODO:特殊处理有 add index 的逻辑
139
- func (d * Dispatcher ) addDDLEventToSinkWhenAvailable (event * common.DDLEvent ) {
147
+ // Block Event including ddl Event and Sync Point Event
148
+ func (d * Dispatcher ) addBlockEventToSinkWhenAvailable (event commonEvent.BlockEvent ) {
140
149
// 根据 filter 过滤 query 中不需要 send to downstream 的数据
141
150
// 但应当不出现整个 query 都不需要 send to downstream 的 ddl,这种 ddl 不应该发给 dispatcher
142
151
// TODO: ddl 影响到的 tableSpan 也在 filter 中过滤一遍
143
- err := d .filter .FilterDDLEvent (event )
144
- if err != nil {
145
- log .Error ("filter ddl query failed" , zap .Error (err ))
146
- // 这里怎么处理更合适呢?有错然后反上去让 changefeed 报错
147
- return
152
+ if event .GetType () == commonEvent .TypeDDLEvent {
153
+ ddlEvent := event .(* commonEvent.DDLEvent )
154
+ // TODO:看一下这种写法有没有问题,加个测试后面
155
+ err := d .filter .FilterDDLEvent (ddlEvent )
156
+ if err != nil {
157
+ log .Error ("filter ddl query failed" , zap .Error (err ))
158
+ // 这里怎么处理更合适呢?有错然后反上去让 changefeed 报错
159
+ return
160
+ }
148
161
}
149
162
150
- d .ddlPendingEvent = event
163
+ d .blockPendingEvent = event
151
164
152
165
if d .tableProgress .Empty () {
153
- d .DealWithDDLWhenProgressEmpty ()
166
+ d .DealWithBlockEventWhenProgressEmpty ()
154
167
} else {
155
168
d .checkTableProgressEmptyTask = newCheckProgressEmptyTask (d )
156
169
}
@@ -162,7 +175,7 @@ func (d *Dispatcher) addDDLEventToSinkWhenAvailable(event *common.DDLEvent) {
162
175
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream(async).
163
176
// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and wake the dispatcherEventsHandler
164
177
func (d * Dispatcher ) HandleDispatcherStatus (dispatcherStatus * heartbeatpb.DispatcherStatus ) {
165
- if d .ddlPendingEvent == nil {
178
+ if d .blockPendingEvent == nil {
166
179
if dispatcherStatus .GetAction () != nil {
167
180
// 只可能出现在 event 已经推进了,但是还重复收到了 action 消息的时候,则重发包含 checkpointTs 的心跳
168
181
d .statusesChan <- & heartbeatpb.TableSpanStatus {
@@ -176,11 +189,11 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
176
189
177
190
action := dispatcherStatus .GetAction ()
178
191
if action != nil {
179
- if action .CommitTs == d .ddlPendingEvent . FinishedTs {
192
+ if action .CommitTs == d .blockPendingEvent . GetCommitTs () {
180
193
if action .Action == heartbeatpb .Action_Write {
181
- d .sink .AddDDLAndSyncPointEvent (d .ddlPendingEvent , d .tableProgress )
194
+ d .sink .AddBlockEvent (d .blockPendingEvent , d .tableProgress )
182
195
} else {
183
- d .sink .PassDDLAndSyncPointEvent (d .ddlPendingEvent , d .tableProgress )
196
+ d .sink .PassBlockEvent (d .blockPendingEvent , d .tableProgress )
184
197
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream ()
185
198
dispatcherEventDynamicStream .Wake () <- d .id
186
199
}
@@ -193,49 +206,71 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
193
206
}
194
207
195
208
ack := dispatcherStatus .GetAck ()
196
- if ack != nil && ack .CommitTs == d .ddlPendingEvent . FinishedTs {
209
+ if ack != nil && ack .CommitTs == d .blockPendingEvent . GetCommitTs () {
197
210
d .CancelResendTask ()
198
211
}
199
212
}
200
213
201
- func (d * Dispatcher ) HandleEvent (event common .Event ) (block bool ) {
214
+ func (d * Dispatcher ) HandleEvent (event commonEvent .Event ) (block bool ) {
202
215
switch event .GetType () {
203
- case common .TypeResolvedEvent :
204
- d .resolvedTs .Set (event .(common .ResolvedEvent ).ResolvedTs )
216
+ case commonEvent .TypeResolvedEvent :
217
+ d .resolvedTs .Set (event .(commonEvent .ResolvedEvent ).ResolvedTs )
205
218
return false
206
- case common .TypeDMLEvent :
207
- d .sink .AddDMLEvent (event .(* common .DMLEvent ), d .tableProgress )
219
+ case commonEvent .TypeDMLEvent :
220
+ d .sink .AddDMLEvent (event .(* commonEvent .DMLEvent ), d .tableProgress )
208
221
return false
209
- case common .TypeDDLEvent :
210
- event := event .(* common .DDLEvent )
222
+ case commonEvent .TypeDDLEvent :
223
+ event := event .(* commonEvent .DDLEvent )
211
224
if d .tableNameStore != nil {
212
225
d .tableNameStore .AddEvent (event )
213
226
}
214
227
event .AddPostFlushFunc (func () {
215
228
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream ()
216
229
dispatcherEventDynamicStream .Wake () <- event .GetDispatcherID ()
217
230
})
218
- d .addDDLEventToSinkWhenAvailable (event )
231
+ d .addBlockEventToSinkWhenAvailable (event )
219
232
return true
233
+ case commonEvent .TypeSyncPointEvent :
234
+ event := event .(* commonEvent.SyncPointEvent )
235
+ event .AddPostFlushFunc (func () {
236
+ dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream ()
237
+ dispatcherEventDynamicStream .Wake () <- event .GetDispatcherID ()
238
+ })
239
+ d .addBlockEventToSinkWhenAvailable (event )
240
+ return true
241
+ default :
242
+ log .Error ("invalid event type" , zap .Any ("event Type" , event .GetType ()))
243
+ }
244
+ return false
245
+ }
246
+
247
+ func shouldBlock (event commonEvent.BlockEvent ) bool {
248
+ switch event .GetType () {
249
+ case commonEvent .TypeDDLEvent :
250
+ ddlEvent := event .(* commonEvent.DDLEvent )
251
+ return filter .ShouldBlock (model .ActionType (ddlEvent .Type ))
252
+ case commonEvent .TypeSyncPointEvent :
253
+ return true
254
+ default :
255
+ log .Error ("invalid event type" , zap .Any ("event Type" , event .GetType ()))
220
256
}
221
- log .Panic ("invalid event type" , zap .Any ("event" , event ))
222
257
return false
223
258
}
224
259
225
260
// 1.If the event is a single table DDL, it will be added to the sink for writing to downstream(async). If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
226
- // 2. If the event is a multi-table DDL, it will generate a TableSpanStatus message with ddl info to send to maintainer.
227
- func (d * Dispatcher ) DealWithDDLWhenProgressEmpty () {
228
- if ! filter . ShouldBlock ( model . ActionType ( d . ddlPendingEvent . Type ) ) {
229
- d .sink .AddDDLAndSyncPointEvent (d .ddlPendingEvent , d .tableProgress )
230
- if d .ddlPendingEvent .GetNeedAddedTables () != nil || d .ddlPendingEvent .GetNeedDroppedTables () != nil {
261
+ // 2. If the event is a multi-table DDL / sync point Event , it will generate a TableSpanStatus message with ddl info to send to maintainer.
262
+ func (d * Dispatcher ) DealWithBlockEventWhenProgressEmpty () {
263
+ if ! shouldBlock ( d . blockPendingEvent ) {
264
+ d .sink .AddBlockEvent (d .blockPendingEvent , d .tableProgress )
265
+ if d .blockPendingEvent .GetNeedAddedTables () != nil || d .blockPendingEvent .GetNeedDroppedTables () != nil {
231
266
message := & heartbeatpb.TableSpanStatus {
232
267
ID : d .id .ToPB (),
233
268
ComponentStatus : heartbeatpb .ComponentState_Working ,
234
269
State : & heartbeatpb.State {
235
270
IsBlocked : false ,
236
- BlockTs : d .ddlPendingEvent . FinishedTs ,
237
- NeedDroppedTables : d .ddlPendingEvent .GetNeedDroppedTables ().ToPB (),
238
- NeedAddedTables : common .ToTablesPB (d .ddlPendingEvent .GetNeedAddedTables ()),
271
+ BlockTs : d .blockPendingEvent . GetCommitTs () ,
272
+ NeedDroppedTables : d .blockPendingEvent .GetNeedDroppedTables ().ToPB (),
273
+ NeedAddedTables : commonEvent .ToTablesPB (d .blockPendingEvent .GetNeedAddedTables ()),
239
274
},
240
275
}
241
276
d .SetResendTask (newResendTask (message , d ))
@@ -247,11 +282,12 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
247
282
ComponentStatus : heartbeatpb .ComponentState_Working ,
248
283
State : & heartbeatpb.State {
249
284
IsBlocked : true ,
250
- BlockTs : d .ddlPendingEvent .FinishedTs ,
251
- BlockTables : d .ddlPendingEvent .GetBlockedTables ().ToPB (),
252
- NeedDroppedTables : d .ddlPendingEvent .GetNeedDroppedTables ().ToPB (),
253
- NeedAddedTables : common .ToTablesPB (d .ddlPendingEvent .GetNeedAddedTables ()),
254
- UpdatedSchemas : common .ToSchemaIDChangePB (d .ddlPendingEvent .GetUpdatedSchemas ()), // only exists for rename table and rename tables
285
+ BlockTs : d .blockPendingEvent .GetCommitTs (),
286
+ BlockTables : d .blockPendingEvent .GetBlockedTables ().ToPB (),
287
+ NeedDroppedTables : d .blockPendingEvent .GetNeedDroppedTables ().ToPB (),
288
+ NeedAddedTables : commonEvent .ToTablesPB (d .blockPendingEvent .GetNeedAddedTables ()),
289
+ UpdatedSchemas : commonEvent .ToSchemaIDChangePB (d .blockPendingEvent .GetUpdatedSchemas ()), // only exists for rename table and rename tables
290
+ IsSyncPoint : d .blockPendingEvent .GetType () == commonEvent .TypeSyncPointEvent ,
255
291
},
256
292
}
257
293
d .SetResendTask (newResendTask (message , d ))
@@ -267,8 +303,8 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
267
303
// So there won't be a related db-level ddl event is in dealing when we get update schema id events.
268
304
// Thus, whether to update schema id before or after current ddl event is not important.
269
305
// To make it easier, we choose to directly update schema id here.
270
- if d .ddlPendingEvent .GetUpdatedSchemas () != nil && d .tableSpan != heartbeatpb .DDLSpan {
271
- for _ , schemaIDChange := range d .ddlPendingEvent .GetUpdatedSchemas () {
306
+ if d .blockPendingEvent .GetUpdatedSchemas () != nil && d .tableSpan != heartbeatpb .DDLSpan {
307
+ for _ , schemaIDChange := range d .blockPendingEvent .GetUpdatedSchemas () {
272
308
if schemaIDChange .TableID == d .tableSpan .TableID {
273
309
if schemaIDChange .OldSchemaID != d .schemaID {
274
310
log .Error ("Wrong Schema ID" , zap .Any ("dispatcherID" , d .id ), zap .Any ("except schemaID" , schemaIDChange .OldSchemaID ), zap .Any ("actual schemaID" , d .schemaID ), zap .Any ("tableSpan" , d .tableSpan .String ()))
@@ -329,9 +365,25 @@ func (d *Dispatcher) GetSchemaID() int64 {
329
365
return d .schemaID
330
366
}
331
367
332
- //func (d *Dispatcher) GetSyncPointInfo() *SyncPointInfo {
333
- // return d.syncPointInfo
334
- // }
368
+ func (d * Dispatcher ) EnableSyncPoint () bool {
369
+ return d .SyncPointInfo .EnableSyncPoint
370
+ }
371
+
372
+ func (d * Dispatcher ) GetSyncPointTs () uint64 {
373
+ if d .SyncPointInfo .EnableSyncPoint {
374
+ return d .SyncPointInfo .InitSyncPointTs
375
+ } else {
376
+ return 0
377
+ }
378
+ }
379
+
380
+ func (d * Dispatcher ) GetSyncPointInterval () time.Duration {
381
+ if d .SyncPointInfo .EnableSyncPoint {
382
+ return d .SyncPointInfo .SyncPointInterval
383
+ } else {
384
+ return time .Duration (0 )
385
+ }
386
+ }
335
387
336
388
func (d * Dispatcher ) Remove () {
337
389
// TODO: 修改这个 dispatcher 的 status 为 removing
0 commit comments