Skip to content

Commit d3c9ba0

Browse files
authored
generic bootstrapper (#357)
1 parent 86bb2e9 commit d3c9ba0

File tree

3 files changed

+41
-39
lines changed

3 files changed

+41
-39
lines changed

maintainer/maintainer.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/flowbehappy/tigate/heartbeatpb"
2323
"github.com/flowbehappy/tigate/logservice/schemastore"
2424
"github.com/flowbehappy/tigate/maintainer/split"
25+
"github.com/flowbehappy/tigate/pkg/bootstrap"
2526
"github.com/flowbehappy/tigate/pkg/common"
2627
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
2728
configNew "github.com/flowbehappy/tigate/pkg/config"
@@ -62,7 +63,7 @@ type Maintainer struct {
6263
checkpointTsByCapture map[node.ID]heartbeatpb.Watermark
6364

6465
state heartbeatpb.ComponentState
65-
bootstrapper *Bootstrapper
66+
bootstrapper *bootstrap.Bootstrapper[heartbeatpb.MaintainerBootstrapResponse]
6667

6768
changefeedSate model.FeedState
6869

@@ -145,7 +146,7 @@ func NewMaintainer(cfID model.ChangeFeedID,
145146
tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace, cfID.ID),
146147
handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace, cfID.ID),
147148
}
148-
m.bootstrapper = NewBootstrapper(m.id.ID, m.getNewBootstrapFn())
149+
m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.ID, m.getNewBootstrapFn())
149150
m.barrier = NewBarrier(m.controller)
150151
log.Info("maintainer is created", zap.String("id", cfID.String()))
151152
metrics.MaintainerGauge.WithLabelValues(cfID.Namespace, cfID.ID).Inc()

maintainer/bootstrap.go pkg/bootstrap/bootstrap.go

+29-29
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
package maintainer
14+
package bootstrap
1515

1616
import (
1717
"time"
1818

19-
"github.com/flowbehappy/tigate/heartbeatpb"
2019
"github.com/flowbehappy/tigate/pkg/messaging"
2120
"github.com/flowbehappy/tigate/pkg/node"
2221
"github.com/flowbehappy/tigate/scheduler"
@@ -28,13 +27,13 @@ import (
2827
// when a maintainer is started, it must wait of node to reported their managed dispatchers
2928
// only all dispatcher has reported its status, maintainer can schedule tables
3029
// maintainer collects all working dispatchers and
31-
type Bootstrapper struct {
32-
// changefeedID is the log identifier
33-
changefeedID string
30+
type Bootstrapper[T any] struct {
31+
// id is the log identifier
32+
id string
3433
// bootstrap identify is the bootstrapper is already bootstrapped
3534
bootstrapped bool
3635

37-
nodes map[node.ID]*NodeStatus
36+
nodes map[node.ID]*NodeStatus[T]
3837
newBootstrapMsg scheduler.NewBootstrapFn
3938

4039
// for ut test
@@ -43,10 +42,10 @@ type Bootstrapper struct {
4342
}
4443

4544
// NewBootstrapper create a new bootstrap for a changefeed maintainer
46-
func NewBootstrapper(cfID string, newBootstrapMsg scheduler.NewBootstrapFn) *Bootstrapper {
47-
return &Bootstrapper{
48-
changefeedID: cfID,
49-
nodes: make(map[node.ID]*NodeStatus),
45+
func NewBootstrapper[T any](id string, newBootstrapMsg scheduler.NewBootstrapFn) *Bootstrapper[T] {
46+
return &Bootstrapper[T]{
47+
id: id,
48+
nodes: make(map[node.ID]*NodeStatus[T]),
5049
bootstrapped: false,
5150
newBootstrapMsg: newBootstrapMsg,
5251
timeNowFunc: time.Now,
@@ -55,13 +54,14 @@ func NewBootstrapper(cfID string, newBootstrapMsg scheduler.NewBootstrapFn) *Boo
5554
}
5655

5756
// HandleBootstrapResponse cache the message reported remote node
58-
func (b *Bootstrapper) HandleBootstrapResponse(
57+
// return cached bootstrap response if all node are initialized
58+
func (b *Bootstrapper[T]) HandleBootstrapResponse(
5959
from node.ID,
60-
msg *heartbeatpb.MaintainerBootstrapResponse) map[node.ID]*heartbeatpb.MaintainerBootstrapResponse {
60+
msg *T) map[node.ID]*T {
6161
status, ok := b.nodes[from]
6262
if !ok {
6363
log.Warn("node is not found, ignore",
64-
zap.String("changefeed", b.changefeedID),
64+
zap.String("changefeed", b.id),
6565
zap.Any("from", from))
6666
return nil
6767
}
@@ -71,14 +71,14 @@ func (b *Bootstrapper) HandleBootstrapResponse(
7171
}
7272

7373
// HandleNewNodes add node to bootstrapper and return rpc messages that need to be sent to remote node
74-
func (b *Bootstrapper) HandleNewNodes(nodes []*node.Info) []*messaging.TargetMessage {
74+
func (b *Bootstrapper[T]) HandleNewNodes(nodes []*node.Info) []*messaging.TargetMessage {
7575
msgs := make([]*messaging.TargetMessage, 0, len(nodes))
7676
for _, info := range nodes {
7777
if _, ok := b.nodes[info.ID]; !ok {
7878
// A new server.
79-
b.nodes[info.ID] = NewNodeStatus(info)
79+
b.nodes[info.ID] = NewNodeStatus[T](info)
8080
log.Info("find a new server",
81-
zap.String("changefeed", b.changefeedID),
81+
zap.String("changefeed", b.id),
8282
zap.String("captureAddr", info.AdvertiseAddr),
8383
zap.Any("server", info.ID))
8484
msgs = append(msgs, b.newBootstrapMsg(info.ID))
@@ -91,26 +91,26 @@ func (b *Bootstrapper) HandleNewNodes(nodes []*node.Info) []*messaging.TargetMes
9191
// HandleRemoveNodes remove node from bootstrapper,
9292
// finished bootstrap if all node are initialized after these node removed
9393
// return cached bootstrap
94-
func (b *Bootstrapper) HandleRemoveNodes(nodeIDs []node.ID) map[node.ID]*heartbeatpb.MaintainerBootstrapResponse {
94+
func (b *Bootstrapper[T]) HandleRemoveNodes(nodeIDs []node.ID) map[node.ID]*T {
9595
for _, id := range nodeIDs {
9696
status, ok := b.nodes[id]
9797
if ok {
9898
delete(b.nodes, id)
9999
log.Info("remove node from bootstrap",
100-
zap.String("changefeed", b.changefeedID),
100+
zap.String("changefeed", b.id),
101101
zap.Int("status", int(status.state)),
102102
zap.Any("id", id))
103103
} else {
104104
log.Info("node is node tracked by bootstrap",
105-
zap.String("changefeed", b.changefeedID),
105+
zap.String("changefeed", b.id),
106106
zap.Any("id", id))
107107
}
108108
}
109109
return b.fistBootstrap()
110110
}
111111

112112
// ResendBootstrapMessage return rpc message that need to be resent
113-
func (b *Bootstrapper) ResendBootstrapMessage() []*messaging.TargetMessage {
113+
func (b *Bootstrapper[T]) ResendBootstrapMessage() []*messaging.TargetMessage {
114114
var msgs []*messaging.TargetMessage
115115
if !b.CheckAllNodeInitialized() {
116116
now := b.timeNowFunc()
@@ -126,18 +126,18 @@ func (b *Bootstrapper) ResendBootstrapMessage() []*messaging.TargetMessage {
126126
}
127127

128128
// GetAllNodes return all nodes the tracked by bootstrapper, the returned value must not be modified
129-
func (b *Bootstrapper) GetAllNodes() map[node.ID]*NodeStatus {
129+
func (b *Bootstrapper[T]) GetAllNodes() map[node.ID]*NodeStatus[T] {
130130
return b.nodes
131131
}
132132

133133
// CheckAllNodeInitialized check if all server is initialized.
134134
// returns true when all server reports the bootstrap response and bootstrapped
135-
func (b *Bootstrapper) CheckAllNodeInitialized() bool {
135+
func (b *Bootstrapper[T]) CheckAllNodeInitialized() bool {
136136
return b.bootstrapped && b.checkAllCaptureInitialized()
137137
}
138138

139139
// return true if all node reports the bootstrap response
140-
func (b *Bootstrapper) checkAllCaptureInitialized() bool {
140+
func (b *Bootstrapper[T]) checkAllCaptureInitialized() bool {
141141
for _, captureStatus := range b.nodes {
142142
// CaptureStateStopping is also considered initialized, because when
143143
// a server shutdown, it becomes stopping, we need to move its tables
@@ -153,11 +153,11 @@ func (b *Bootstrapper) checkAllCaptureInitialized() bool {
153153
// return nil is not bootstrapped or already bootstrapped before
154154
// return cached heartbeatpb.MaintainerBootstrapResponse map if it's not bootstrapped before
155155
// bootstrapper only return once
156-
func (b *Bootstrapper) fistBootstrap() map[node.ID]*heartbeatpb.MaintainerBootstrapResponse {
156+
func (b *Bootstrapper[T]) fistBootstrap() map[node.ID]*T {
157157
// first bootstrapped time, return the cached resp and clear it
158158
if !b.bootstrapped && b.checkAllCaptureInitialized() {
159159
b.bootstrapped = true
160-
allCachedResp := make(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse, len(b.nodes))
160+
allCachedResp := make(map[node.ID]*T, len(b.nodes))
161161
for _, status := range b.nodes {
162162
allCachedResp[status.node.ID] = status.cachedBootstrapResp
163163
// clear the cached data
@@ -178,17 +178,17 @@ const (
178178
NodeStateInitialized NodeState = 2
179179
)
180180

181-
func NewNodeStatus(node *node.Info) *NodeStatus {
182-
return &NodeStatus{
181+
func NewNodeStatus[T any](node *node.Info) *NodeStatus[T] {
182+
return &NodeStatus[T]{
183183
state: NodeStateUninitialized,
184184
node: node,
185185
}
186186
}
187187

188188
// NodeStatus identify the node the need be bootstrapped
189-
type NodeStatus struct {
189+
type NodeStatus[T any] struct {
190190
state NodeState
191191
node *node.Info
192-
cachedBootstrapResp *heartbeatpb.MaintainerBootstrapResponse
192+
cachedBootstrapResp *T
193193
lastBootstrapTime time.Time
194194
}

maintainer/bootstrap_test.go pkg/bootstrap/bootstrap_test.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,21 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
package maintainer
14+
package bootstrap
1515

1616
import (
17-
"github.com/flowbehappy/tigate/pkg/node"
1817
"testing"
1918
"time"
2019

20+
"github.com/flowbehappy/tigate/pkg/node"
21+
2122
"github.com/flowbehappy/tigate/heartbeatpb"
2223
"github.com/flowbehappy/tigate/pkg/messaging"
2324
"github.com/stretchr/testify/require"
2425
)
2526

2627
func TestHandleBootstrapResponse(t *testing.T) {
27-
b := NewBootstrapper("test", func(id node.ID) *messaging.TargetMessage {
28+
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID) *messaging.TargetMessage {
2829
return &messaging.TargetMessage{}
2930
})
3031
msgs := b.HandleNewNodes([]*node.Info{{ID: "ab"}, {ID: "cd"}})
@@ -60,7 +61,7 @@ func TestHandleBootstrapResponse(t *testing.T) {
6061
}
6162

6263
func TestAddNewNode(t *testing.T) {
63-
b := NewBootstrapper("test", func(id node.ID) *messaging.TargetMessage {
64+
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID) *messaging.TargetMessage {
6465
return &messaging.TargetMessage{}
6566
})
6667
msgs := b.HandleNewNodes([]*node.Info{{ID: "ab"}})
@@ -75,7 +76,7 @@ func TestAddNewNode(t *testing.T) {
7576
}
7677

7778
func TestHandleRemoveNodes(t *testing.T) {
78-
b := NewBootstrapper("test", func(id node.ID) *messaging.TargetMessage {
79+
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID) *messaging.TargetMessage {
7980
return &messaging.TargetMessage{}
8081
})
8182
msgs := b.HandleNewNodes([]*node.Info{{ID: "ab"}, {ID: "cd"}})
@@ -96,7 +97,7 @@ func TestHandleRemoveNodes(t *testing.T) {
9697
}
9798

9899
func TestResendBootstrapMessage(t *testing.T) {
99-
b := NewBootstrapper("test", func(id node.ID) *messaging.TargetMessage {
100+
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID) *messaging.TargetMessage {
100101
return &messaging.TargetMessage{
101102
To: id,
102103
}
@@ -119,7 +120,7 @@ func TestResendBootstrapMessage(t *testing.T) {
119120
}
120121

121122
func TestCheckAllNodeInitialized(t *testing.T) {
122-
b := NewBootstrapper("test", func(id node.ID) *messaging.TargetMessage {
123+
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID) *messaging.TargetMessage {
123124
return &messaging.TargetMessage{}
124125
})
125126
msgs := b.HandleNewNodes([]*node.Info{{ID: "ab"}})
@@ -135,7 +136,7 @@ func TestCheckAllNodeInitialized(t *testing.T) {
135136
}
136137

137138
func TestGetAllNodes(t *testing.T) {
138-
b := NewBootstrapper("test", func(id node.ID) *messaging.TargetMessage {
139+
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID) *messaging.TargetMessage {
139140
return &messaging.TargetMessage{}
140141
})
141142
b.HandleNewNodes([]*node.Info{{ID: "ab"}, {ID: "cd"}})

0 commit comments

Comments
 (0)