Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor by cr
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jul 5, 2023
1 parent bed0dbb commit cc70934
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 132 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.10
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
Expand Down Expand Up @@ -89,15 +91,15 @@ require (
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c => /Users/zulliangwang/code/ceres/ceresdbproto/golang
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client *
dispatch := eventdispatch.NewDispatchImpl()
procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)

schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType())
schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetMaxConcurrentOpenShardBatchSize())

return &Cluster{
logger: logger,
Expand Down
51 changes: 27 additions & 24 deletions server/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,15 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt

createTime := time.Now().UnixMilli()
clusterMetadataStorage := storage.Cluster{
ID: clusterID,
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
CreatedAt: uint64(createTime),
ModifiedAt: uint64(createTime),
ID: clusterID,
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
MaxConcurrentOpenShardBatchSize: opts.MaxConcurrentOpenShardBatchSize,
CreatedAt: uint64(createTime),
ModifiedAt: uint64(createTime),
}
err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{
Cluster: clusterMetadataStorage,
Expand Down Expand Up @@ -164,14 +165,15 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt
}

err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{
ID: c.GetMetadata().GetClusterID(),
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
CreatedAt: c.GetMetadata().GetCreateTime(),
ModifiedAt: uint64(time.Now().UnixMilli()),
ID: c.GetMetadata().GetClusterID(),
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
MaxConcurrentOpenShardBatchSize: opt.MaxConcurrentOpenShardBatchSize,
CreatedAt: c.GetMetadata().GetCreateTime(),
ModifiedAt: uint64(time.Now().UnixMilli()),
}})
if err != nil {
log.Error("update cluster", zap.Error(err))
Expand Down Expand Up @@ -333,14 +335,15 @@ func (m *managerImpl) Start(ctx context.Context) error {
if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
req := storage.UpdateClusterRequest{
Cluster: storage.Cluster{
ID: metadataStorage.ID,
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
CreatedAt: metadataStorage.CreatedAt,
ModifiedAt: uint64(time.Now().UnixMilli()),
ID: metadataStorage.ID,
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
MaxConcurrentOpenShardBatchSize: metadataStorage.MaxConcurrentOpenShardBatchSize,
CreatedAt: metadataStorage.CreatedAt,
ModifiedAt: uint64(time.Now().UnixMilli()),
},
}
if err := m.storage.UpdateCluster(ctx, req); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ func (c *ClusterMetadata) GetTopologyType() storage.TopologyType {
return c.metaData.TopologyType
}

func (c *ClusterMetadata) GetMaxConcurrentOpenShardBatchSize() uint32 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.metaData.MaxConcurrentOpenShardBatchSize
}

func (c *ClusterMetadata) GetCreateTime() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
16 changes: 9 additions & 7 deletions server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ type ShardNodeWithVersion struct {
}

type CreateClusterOpts struct {
NodeCount uint32
ReplicationFactor uint32
ShardTotal uint32
EnableSchedule bool
TopologyType storage.TopologyType
NodeCount uint32
ReplicationFactor uint32
ShardTotal uint32
EnableSchedule bool
TopologyType storage.TopologyType
MaxConcurrentOpenShardBatchSize uint32
}

type UpdateClusterOpts struct {
EnableSchedule bool
TopologyType storage.TopologyType
EnableSchedule bool
TopologyType storage.TopologyType
MaxConcurrentOpenShardBatchSize uint32
}

type CreateTableMetadataRequest struct {
Expand Down
7 changes: 6 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config
import (
"flag"
"fmt"
"math"
"os"
"strings"
"time"
Expand Down Expand Up @@ -52,7 +53,8 @@ const (
defaultClusterShardTotal = 8
enableSchedule = true
// topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage.
defaultTopologyType = "static"
defaultTopologyType = "static"
defaultMaxConcurrentOpenShardBatchSize = math.MaxUint32

defaultHTTPPort = 8080

Expand Down Expand Up @@ -127,6 +129,8 @@ type Config struct {
EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"`
// TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster.
TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"`
// MaxConcurrentOpenShardBatchSize determines the maximum number of shards in a single batch when opening shards concurrently.
MaxConcurrentOpenShardBatchSize uint32 `toml:"max-concurrent-open-shard-batch-size" env:"MAX-CONCURRENT_OPEN_SHARD_BATCH_SIZE"`

ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"`
PeerUrls string `toml:"peer-urls" env:"PEER_URLS"`
Expand Down Expand Up @@ -296,6 +300,7 @@ func MakeConfigParser() (*Parser, error) {
DefaultClusterShardTotal: defaultClusterShardTotal,
EnableSchedule: enableSchedule,
TopologyType: defaultTopologyType,
MaxConcurrentOpenShardBatchSize: defaultMaxConcurrentOpenShardBatchSize,

HTTPPort: defaultHTTPPort,
}
Expand Down
37 changes: 20 additions & 17 deletions server/coordinator/procedure/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"crypto/rand"
"fmt"
"math"
"math/big"
"testing"
"time"
Expand All @@ -21,16 +22,17 @@ import (
)

const (
TestTableName0 = "table0"
TestTableName1 = "table1"
TestSchemaName = "TestSchemaName"
TestRootPath = "/rootPath"
DefaultIDAllocatorStep = 20
ClusterName = "ceresdbCluster1"
DefaultNodeCount = 2
DefaultShardTotal = 4
DefaultSchedulerOperator = true
DefaultTopologyType = "static"
TestTableName0 = "table0"
TestTableName1 = "table1"
TestSchemaName = "TestSchemaName"
TestRootPath = "/rootPath"
DefaultIDAllocatorStep = 20
ClusterName = "ceresdbCluster1"
DefaultNodeCount = 2
DefaultShardTotal = 4
DefaultSchedulerOperator = true
DefaultTopologyType = "static"
DefaultMaxConcurrentOpenShardBatchSize = math.MaxUint32
)

type MockDispatch struct{}
Expand Down Expand Up @@ -99,13 +101,14 @@ func InitEmptyCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
logger := zap.NewNop()

clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{
ID: 0,
Name: ClusterName,
MinNodeCount: DefaultNodeCount,
ShardTotal: DefaultShardTotal,
EnableSchedule: DefaultSchedulerOperator,
TopologyType: DefaultTopologyType,
CreatedAt: 0,
ID: 0,
Name: ClusterName,
MinNodeCount: DefaultNodeCount,
ShardTotal: DefaultShardTotal,
EnableSchedule: DefaultSchedulerOperator,
TopologyType: DefaultTopologyType,
MaxConcurrentOpenShardBatchSize: DefaultMaxConcurrentOpenShardBatchSize,
CreatedAt: 0,
}, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep)

err := clusterMetadata.Init(ctx)
Expand Down
15 changes: 10 additions & 5 deletions server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ import (

// AssignShardScheduler used to assigning shards without nodes.
type AssignShardScheduler struct {
factory *coordinator.Factory
nodePicker coordinator.NodePicker
factory *coordinator.Factory
nodePicker coordinator.NodePicker
maxConcurrentOpenShardBatchSize uint32
}

func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, maxConcurrentOpenShardBatchSize uint32) Scheduler {
return &AssignShardScheduler{
factory: factory,
nodePicker: nodePicker,
factory: factory,
nodePicker: nodePicker,
maxConcurrentOpenShardBatchSize: maxConcurrentOpenShardBatchSize,
}
}

Expand Down Expand Up @@ -56,6 +58,9 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name))
if len(procedures) >= int(a.maxConcurrentOpenShardBatchSize) {
break
}
}

if len(procedures) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestAssignSchedule(t *testing.T) {

procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))

s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)

// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
Expand Down
19 changes: 12 additions & 7 deletions server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ import (
)

type RebalancedShardScheduler struct {
logger *zap.Logger
factory *coordinator.Factory
nodePicker coordinator.NodePicker
logger *zap.Logger
factory *coordinator.Factory
nodePicker coordinator.NodePicker
maxConcurrentOpenShardBatchSize uint32
}

func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, maxConcurrentOpenShardBatchSize uint32) Scheduler {
return &RebalancedShardScheduler{
logger: logger,
factory: factory,
nodePicker: nodePicker,
logger: logger,
factory: factory,
nodePicker: nodePicker,
maxConcurrentOpenShardBatchSize: maxConcurrentOpenShardBatchSize,
}
}

Expand Down Expand Up @@ -54,6 +56,9 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name))
if len(procedures) >= int(r.maxConcurrentOpenShardBatchSize) {
break
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestRebalancedScheduler(t *testing.T) {

procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))

s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)

// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
Expand Down
Loading

0 comments on commit cc70934

Please sign in to comment.