Skip to content

Commit 39254bb

Browse files
committed
add split checker basic logic
1 parent b9317f5 commit 39254bb

13 files changed

+482
-35
lines changed

maintainer/checker/balance_checker.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package checker
15+
16+
import (
17+
"github.com/flowbehappy/tigate/maintainer/operator"
18+
"github.com/flowbehappy/tigate/maintainer/replica"
19+
"github.com/flowbehappy/tigate/server/watcher"
20+
)
21+
22+
// BalanceChecker is used to check the balance status of all spans among all nodes
23+
type BalanceChecker struct {
24+
changefeedID string
25+
operatorController *operator.Controller
26+
replicationDB *replica.ReplicationDB
27+
nodeManager *watcher.NodeManager
28+
}
29+
30+
func NewBalanceChecker(
31+
changefeedID string,
32+
oc *operator.Controller,
33+
db *replica.ReplicationDB,
34+
nodeManager *watcher.NodeManager) *BalanceChecker {
35+
return &BalanceChecker{
36+
changefeedID: changefeedID,
37+
operatorController: oc,
38+
replicationDB: db,
39+
nodeManager: nodeManager,
40+
}
41+
}
42+
43+
func (b *BalanceChecker) Check() {
44+
45+
}
46+
47+
func (b *BalanceChecker) Name() string {
48+
return "balance-checker"
49+
}
+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package checker
15+
16+
import (
17+
"time"
18+
19+
"github.com/flowbehappy/tigate/maintainer/operator"
20+
"github.com/flowbehappy/tigate/maintainer/replica"
21+
"github.com/flowbehappy/tigate/maintainer/split"
22+
"github.com/flowbehappy/tigate/server/watcher"
23+
)
24+
25+
// Controller is the controller of all checkers, it will periodically execute all checkers
26+
type Controller struct {
27+
batchSize int
28+
changefeedID string
29+
operatorController *operator.Controller
30+
replicationDB *replica.ReplicationDB
31+
nodeManager *watcher.NodeManager
32+
33+
maxTimePerRound time.Duration
34+
checkers []Checker
35+
checkedIndex int
36+
}
37+
38+
func NewController(changefeedID string,
39+
splitter *split.Splitter,
40+
oc *operator.Controller,
41+
db *replica.ReplicationDB,
42+
nodeManager *watcher.NodeManager) *Controller {
43+
c := &Controller{
44+
changefeedID: changefeedID,
45+
operatorController: oc,
46+
replicationDB: db,
47+
nodeManager: nodeManager,
48+
maxTimePerRound: time.Second * 5,
49+
}
50+
c.checkers = []Checker{
51+
NewSplitChecker(changefeedID, splitter, oc, db, nodeManager),
52+
NewBalanceChecker(changefeedID, oc, db, nodeManager),
53+
}
54+
return c
55+
}
56+
57+
// Execute periodically execute the operator
58+
func (ctl *Controller) Execute() time.Time {
59+
now := time.Now()
60+
for {
61+
if ctl.checkedIndex >= len(ctl.checkers) {
62+
ctl.checkedIndex = 0
63+
break
64+
}
65+
checker := ctl.checkers[ctl.checkedIndex]
66+
checker.Check()
67+
ctl.checkedIndex++
68+
69+
if time.Since(now) > ctl.maxTimePerRound {
70+
ctl.checkedIndex = ctl.checkedIndex % len(ctl.checkers)
71+
break
72+
}
73+
}
74+
return time.Now().Add(time.Second * 5)
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package checker
15+
16+
import (
17+
"testing"
18+
"time"
19+
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
func TestControllerExecute(t *testing.T) {
24+
ctl := NewController("test", nil, nil, nil, nil)
25+
require.Equal(t, 2, len(ctl.checkers))
26+
ctl.maxTimePerRound = time.Hour
27+
ctl.Execute()
28+
require.Equal(t, 0, ctl.checkedIndex)
29+
// only check the first checker
30+
ctl.maxTimePerRound = -1
31+
ctl.Execute()
32+
require.Equal(t, 1, ctl.checkedIndex)
33+
ctl.Execute()
34+
require.Equal(t, 0, ctl.checkedIndex)
35+
}

maintainer/checker/checker.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package checker
15+
16+
// Checker is the interface for all checkers,
17+
// checker are used to check the status of the system, like balance, split, merge etc.
18+
// checker will be scheduled by the controller periodically and submit the result to the operator
19+
// checker is not an urgent task, don't have to be executed timely, it's ok to be delayed
20+
type Checker interface {
21+
// Name returns the name of the checker
22+
Name() string
23+
// Check the main function of the checker, it will be called by the controller periodically
24+
Check()
25+
}

maintainer/checker/split_checker.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package checker
15+
16+
import (
17+
"context"
18+
"time"
19+
20+
"github.com/flowbehappy/tigate/maintainer/operator"
21+
"github.com/flowbehappy/tigate/maintainer/replica"
22+
"github.com/flowbehappy/tigate/maintainer/split"
23+
"github.com/flowbehappy/tigate/server/watcher"
24+
"github.com/pingcap/log"
25+
"go.uber.org/zap"
26+
)
27+
28+
// SplitChecker is used to check the split status of all spans
29+
type SplitChecker struct {
30+
changefeedID string
31+
splitter *split.Splitter
32+
opController *operator.Controller
33+
db *replica.ReplicationDB
34+
nodeManager *watcher.NodeManager
35+
36+
maxCheckTime time.Duration
37+
checkInterval time.Duration
38+
lastCheckTime time.Time
39+
40+
checkedIndex int
41+
cachedSpans []*replica.SpanReplication
42+
}
43+
44+
func NewSplitChecker(
45+
changefeedID string,
46+
splitter *split.Splitter,
47+
opController *operator.Controller,
48+
db *replica.ReplicationDB,
49+
nodeManager *watcher.NodeManager) *SplitChecker {
50+
return &SplitChecker{
51+
changefeedID: changefeedID,
52+
splitter: splitter,
53+
opController: opController,
54+
db: db,
55+
nodeManager: nodeManager,
56+
57+
maxCheckTime: time.Second * 5,
58+
checkInterval: time.Second * 120,
59+
}
60+
}
61+
62+
func (s *SplitChecker) Name() string {
63+
return "split-checker"
64+
}
65+
66+
func (s *SplitChecker) Check() {
67+
if s.splitter == nil {
68+
return
69+
}
70+
if time.Since(s.lastCheckTime) < s.checkInterval {
71+
return
72+
}
73+
if s.cachedSpans == nil {
74+
s.cachedSpans = s.db.GetReplicating()
75+
s.checkedIndex = 0
76+
}
77+
start := time.Now()
78+
for ; s.checkedIndex < len(s.cachedSpans); s.checkedIndex++ {
79+
span := s.cachedSpans[s.checkedIndex]
80+
if s.db.GetTaskByID(span.ID) == nil {
81+
continue
82+
}
83+
spans := s.splitter.SplitSpans(context.Background(), span.Span, len(s.nodeManager.GetAliveNodes()))
84+
if len(spans) > 1 {
85+
log.Info("split span",
86+
zap.String("changefeed", s.changefeedID),
87+
zap.String("span", span.ID.String()),
88+
zap.Int("span szie", len(spans)))
89+
s.opController.AddOperator(operator.NewSplitDispatcherOperator(s.db, span, span.GetNodeID(), spans))
90+
}
91+
if time.Since(start) > s.maxCheckTime {
92+
break
93+
}
94+
}
95+
if s.checkedIndex >= len(s.cachedSpans) {
96+
s.cachedSpans = nil
97+
s.checkedIndex = 0
98+
s.lastCheckTime = time.Now()
99+
}
100+
}

maintainer/maintainer_controller.go

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/flowbehappy/tigate/heartbeatpb"
21+
"github.com/flowbehappy/tigate/maintainer/checker"
2122
"github.com/flowbehappy/tigate/maintainer/operator"
2223
"github.com/flowbehappy/tigate/maintainer/replica"
2324
"github.com/flowbehappy/tigate/maintainer/scheduler"
@@ -47,6 +48,7 @@ type Controller struct {
4748

4849
spanScheduler *scheduler.Scheduler
4950
operatorController *operator.Controller
51+
checkController *checker.Controller
5052
replicationDB *replica.ReplicationDB
5153
messageCenter messaging.MessageCenter
5254
nodeManager *watcher.NodeManager
@@ -62,6 +64,7 @@ type Controller struct {
6264
taskScheduler threadpool.ThreadPool
6365
operatorControllerHandle *threadpool.TaskHandle
6466
schedulerHandle *threadpool.TaskHandle
67+
checkerHandle *threadpool.TaskHandle
6568
}
6669

6770
func NewController(changefeedID string,
@@ -91,6 +94,7 @@ func NewController(changefeedID string,
9194
s.splitter = split.NewSplitter(changefeedID, pdapi, regionCache, config)
9295
s.spanReplicationEnabled = true
9396
}
97+
s.checkController = checker.NewController(changefeedID, s.splitter, oc, replicaSetDB, nodeManager)
9498
return s
9599
}
96100

@@ -221,6 +225,7 @@ func (c *Controller) FinishBootstrap(workingMap map[int64]utils.Map[*heartbeatpb
221225
// start operator and scheduler
222226
c.operatorControllerHandle = c.taskScheduler.Submit(c.spanScheduler, time.Now())
223227
c.schedulerHandle = c.taskScheduler.Submit(c.operatorController, time.Now())
228+
c.checkerHandle = c.taskScheduler.Submit(c.checkController, time.Now().Add(time.Second*120))
224229
c.bootstrapped = true
225230
c.initialTables = nil
226231
}
@@ -232,6 +237,9 @@ func (c *Controller) Stop() {
232237
if c.schedulerHandle != nil {
233238
c.schedulerHandle.Cancel()
234239
}
240+
if c.checkerHandle != nil {
241+
c.checkerHandle.Cancel()
242+
}
235243
}
236244

237245
// GetTask queries a task by dispatcherID, return nil if not found

0 commit comments

Comments
 (0)