@@ -25,11 +25,13 @@ import (
25
25
// ReplicationDB is an in memory data struct that maintains the replication spans
26
26
type ReplicationDB struct {
27
27
changefeedID string
28
- nodeTasks map [node.ID ]map [common.DispatcherID ]* SpanReplication
29
- // group the spans by schema id
28
+ // allTasks maintains all the span tasks
29
+ allTasks map [common.DispatcherID ]* SpanReplication
30
+
31
+ // group the tasks by the node id, schema id, and table id for fast access
32
+ nodeTasks map [node.ID ]map [common.DispatcherID ]* SpanReplication
30
33
schemaTasks map [int64 ]map [common.DispatcherID ]* SpanReplication
31
- // group the spans by table id
32
- tableTasks map [int64 ]map [common.DispatcherID ]* SpanReplication
34
+ tableTasks map [int64 ]map [common.DispatcherID ]* SpanReplication
33
35
34
36
// maps that maintained base on the span scheduling status
35
37
replicating map [common.DispatcherID ]* SpanReplication
@@ -48,6 +50,7 @@ func NewReplicaSetDB(changefeedID string) *ReplicationDB {
48
50
schemaTasks : make (map [int64 ]map [common.DispatcherID ]* SpanReplication ),
49
51
tableTasks : make (map [int64 ]map [common.DispatcherID ]* SpanReplication ),
50
52
53
+ allTasks : make (map [common.DispatcherID ]* SpanReplication ),
51
54
replicating : make (map [common.DispatcherID ]* SpanReplication ),
52
55
scheduling : make (map [common.DispatcherID ]* SpanReplication ),
53
56
absent : make (map [common.DispatcherID ]* SpanReplication ),
@@ -60,22 +63,23 @@ func (db *ReplicationDB) GetTaskByID(id common.DispatcherID) *SpanReplication {
60
63
db .lock .RLock ()
61
64
defer db .lock .RUnlock ()
62
65
63
- return db .getTaskByIDUnLock ( id )
66
+ return db .allTasks [ id ]
64
67
}
65
68
66
69
// TaskSize returns the total task size in the db, it includes replicating, scheduling and absent tasks
67
70
func (db * ReplicationDB ) TaskSize () int {
68
71
db .lock .RLock ()
69
72
defer db .lock .RUnlock ()
70
73
71
- return len (db .replicating ) + len ( db . absent ) + len ( db . scheduling )
74
+ return len (db .allTasks )
72
75
}
73
76
74
77
// TryRemoveAll removes non-scheduled tasks from the db and return the scheduled tasks
75
78
func (db * ReplicationDB ) TryRemoveAll () []* SpanReplication {
76
79
db .lock .Lock ()
77
80
defer db .lock .Unlock ()
78
81
82
+ // remove the absent task directly
79
83
for _ , stm := range db .absent {
80
84
db .removeSpanUnLock (stm )
81
85
}
@@ -249,7 +253,7 @@ func (db *ReplicationDB) GetTasksBySchemaID(schemaID int64) []*SpanReplication {
249
253
if ! ok {
250
254
return nil
251
255
}
252
- var replicaSets [] * SpanReplication = make ([]* SpanReplication , 0 , len (sm ))
256
+ var replicaSets = make ([]* SpanReplication , 0 , len (sm ))
253
257
for _ , v := range sm {
254
258
replicaSets = append (replicaSets , v )
255
259
}
@@ -283,8 +287,10 @@ func (db *ReplicationDB) ReplaceReplicaSet(olds []*SpanReplication, news []*Span
283
287
284
288
// first check if all the old replica set exists, if not, return false
285
289
for _ , old := range olds {
286
- dbItem := db .getTaskByIDUnLock (old .ID )
287
- if dbItem == nil {
290
+ if _ , ok := db .allTasks [old .ID ]; ! ok {
291
+ log .Warn ("old replica set not found, skip" ,
292
+ zap .String ("changefeed" , db .changefeedID ),
293
+ zap .String ("span" , old .ID .String ()))
288
294
return false
289
295
}
290
296
}
@@ -306,6 +312,7 @@ func (db *ReplicationDB) AddReplicatingSpan(task *SpanReplication) {
306
312
zap .String ("nodeID" , nodeID .String ()),
307
313
zap .String ("span" , task .ID .String ()))
308
314
315
+ db .allTasks [task .ID ] = task
309
316
db .replicating [task .ID ] = task
310
317
db .updateNodeMap ("" , nodeID , task )
311
318
db .addToSchemaAndTableMap (task )
@@ -411,48 +418,15 @@ func (db *ReplicationDB) BindSpanToNode(old, new node.ID, task *SpanReplication)
411
418
db .updateNodeMap (old , new , task )
412
419
}
413
420
414
- // GetTaskByID returns the replica set by the id, it will search the replicating, scheduling and absent map
415
- func (db * ReplicationDB ) getTaskByIDUnLock (id common.DispatcherID ) * SpanReplication {
416
- r , ok := db .replicating [id ]
417
- if ok {
418
- return r
419
- }
420
- r , ok = db .scheduling [id ]
421
- if ok {
422
- return r
423
- }
424
- return db .absent [id ]
425
- }
426
-
427
421
// addAbsentReplicaSetUnLock adds the replica set to the absent map
428
422
func (db * ReplicationDB ) addAbsentReplicaSetUnLock (tasks ... * SpanReplication ) {
429
423
for _ , task := range tasks {
424
+ db .allTasks [task .ID ] = task
430
425
db .absent [task .ID ] = task
431
426
db .addToSchemaAndTableMap (task )
432
427
}
433
428
}
434
429
435
- // addToSchemaAndTableMap adds the task to the schema and table map
436
- func (db * ReplicationDB ) addToSchemaAndTableMap (task * SpanReplication ) {
437
- tableID := task .Span .TableID
438
- schemaID := task .GetSchemaID ()
439
- // modify the schema map
440
- schemaMap , ok := db .schemaTasks [schemaID ]
441
- if ! ok {
442
- schemaMap = make (map [common.DispatcherID ]* SpanReplication )
443
- db .schemaTasks [schemaID ] = schemaMap
444
- }
445
- schemaMap [task .ID ] = task
446
-
447
- // modify the table map
448
- tableMap , ok := db .tableTasks [tableID ]
449
- if ! ok {
450
- tableMap = make (map [common.DispatcherID ]* SpanReplication )
451
- db .tableTasks [tableID ] = tableMap
452
- }
453
- tableMap [task .ID ] = task
454
- }
455
-
456
430
// removeSpanUnLock removes the replica set from the db without lock
457
431
func (db * ReplicationDB ) removeSpanUnLock (spans ... * SpanReplication ) {
458
432
for _ , span := range spans {
@@ -480,9 +454,31 @@ func (db *ReplicationDB) removeSpanUnLock(spans ...*SpanReplication) {
480
454
if len (nodeMap ) == 0 {
481
455
delete (db .nodeTasks , nodeID )
482
456
}
457
+ delete (db .allTasks , span .ID )
483
458
}
484
459
}
485
460
461
+ // addToSchemaAndTableMap adds the task to the schema and table map
462
+ func (db * ReplicationDB ) addToSchemaAndTableMap (task * SpanReplication ) {
463
+ tableID := task .Span .TableID
464
+ schemaID := task .GetSchemaID ()
465
+ // modify the schema map
466
+ schemaMap , ok := db .schemaTasks [schemaID ]
467
+ if ! ok {
468
+ schemaMap = make (map [common.DispatcherID ]* SpanReplication )
469
+ db .schemaTasks [schemaID ] = schemaMap
470
+ }
471
+ schemaMap [task .ID ] = task
472
+
473
+ // modify the table map
474
+ tableMap , ok := db .tableTasks [tableID ]
475
+ if ! ok {
476
+ tableMap = make (map [common.DispatcherID ]* SpanReplication )
477
+ db .tableTasks [tableID ] = tableMap
478
+ }
479
+ tableMap [task .ID ] = task
480
+ }
481
+
486
482
// updateNodeMap updates the node map, it will remove the task from the old node and add it to the new node
487
483
func (db * ReplicationDB ) updateNodeMap (old , new node.ID , task * SpanReplication ) {
488
484
//clear from the old node
0 commit comments