@@ -15,12 +15,13 @@ package watcher
15
15
16
16
import (
17
17
"context"
18
- "github.com/flowbehappy/tigate/pkg/node"
19
- "github.com/pingcap/tiflow/cdc/model"
20
18
"sync"
19
+ "sync/atomic"
21
20
"time"
22
21
22
+ "github.com/flowbehappy/tigate/pkg/node"
23
23
"github.com/pingcap/log"
24
+ "github.com/pingcap/tiflow/cdc/model"
24
25
"github.com/pingcap/tiflow/pkg/config"
25
26
"github.com/pingcap/tiflow/pkg/etcd"
26
27
"github.com/pingcap/tiflow/pkg/orchestrator"
@@ -36,7 +37,7 @@ type NodeChangeHandler func(map[node.ID]*node.Info)
36
37
type NodeManager struct {
37
38
session * concurrency.Session
38
39
etcdClient etcd.CDCEtcdClient
39
- nodes map [node.ID ]* node.Info
40
+ nodes atomic. Pointer [ map [node.ID ]* node.Info ]
40
41
41
42
nodeChangeHandlers struct {
42
43
sync.RWMutex
@@ -48,15 +49,16 @@ func NewNodeManager(
48
49
session * concurrency.Session ,
49
50
etcdClient etcd.CDCEtcdClient ,
50
51
) * NodeManager {
51
- return & NodeManager {
52
+ m := & NodeManager {
52
53
session : session ,
53
54
etcdClient : etcdClient ,
54
- nodes : make (map [node.ID ]* node.Info ),
55
55
nodeChangeHandlers : struct {
56
56
sync.RWMutex
57
57
m map [node.ID ]NodeChangeHandler
58
58
}{m : make (map [node.ID ]NodeChangeHandler )},
59
59
}
60
+ m .nodes .Store (& map [node.ID ]* node.Info {})
61
+ return m
60
62
}
61
63
62
64
func (c * NodeManager ) Name () string {
@@ -73,22 +75,23 @@ func (c *NodeManager) Tick(
73
75
changed := false
74
76
allNodes := make (map [node.ID ]* node.Info , len (state .Captures ))
75
77
76
- for _ , node := range c .nodes {
77
- if _ , exist := state .Captures [model .CaptureID (node .ID )]; ! exist {
78
+ oldMap := * c .nodes .Load ()
79
+ for _ , info := range oldMap {
80
+ if _ , exist := state .Captures [model .CaptureID (info .ID )]; ! exist {
78
81
changed = true
79
82
}
80
83
}
81
84
82
85
for _ , capture := range state .Captures {
83
- if _ , exist := c . nodes [node .ID (capture .ID )]; ! exist {
86
+ if _ , exist := oldMap [node .ID (capture .ID )]; ! exist {
84
87
changed = true
85
88
}
86
89
allNodes [node .ID (capture .ID )] = node .CaptureInfoToNodeInfo (capture )
87
90
}
88
- c .nodes = allNodes
91
+ c .nodes . Store ( & allNodes )
89
92
if changed {
90
93
log .Info ("server change detected" )
91
- // handle node change event
94
+ // handle info change event
92
95
c .nodeChangeHandlers .RLock ()
93
96
defer c .nodeChangeHandlers .RUnlock ()
94
97
for _ , handler := range c .nodeChangeHandlers .m {
@@ -100,7 +103,7 @@ func (c *NodeManager) Tick(
100
103
101
104
// GetAliveNodes get all alive captures, the caller mustn't modify the returned map
102
105
func (c * NodeManager ) GetAliveNodes () map [node.ID ]* node.Info {
103
- return c .nodes
106
+ return * c .nodes . Load ()
104
107
}
105
108
106
109
func (c * NodeManager ) Run (ctx context.Context ) error {
0 commit comments