From cc7093478606538d189f2d6346d0f097a3633a62 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Wed, 5 Jul 2023 14:12:32 +0800 Subject: [PATCH] refactor: refactor by cr --- go.mod | 6 ++- go.sum | 4 -- server/cluster/cluster.go | 2 +- server/cluster/manager.go | 51 ++++++++++--------- server/cluster/metadata/cluster_metadata.go | 7 +++ server/cluster/metadata/types.go | 16 +++--- server/config/config.go | 7 ++- server/coordinator/procedure/test/common.go | 37 +++++++------- .../scheduler/assign_shard_scheduler.go | 15 ++++-- .../scheduler/assign_shard_scheduler_test.go | 2 +- .../scheduler/rebalanced_shard_scheduler.go | 19 ++++--- .../rebalanced_shard_scheduler_test.go | 2 +- .../scheduler/scheduler_manager.go | 44 ++++++++-------- .../scheduler/scheduler_manager_test.go | 6 +-- .../static_topology_shard_scheduler.go | 15 ++++-- .../static_topology_shard_scheduler_test.go | 2 +- server/server.go | 11 ++-- server/service/http/api.go | 14 ++--- server/storage/types.go | 47 +++++++++-------- 19 files changed, 175 insertions(+), 132 deletions(-) diff --git a/go.mod b/go.mod index 3a537796..774976b4 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,8 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 go.uber.org/zap v1.21.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/tools v0.1.10 google.golang.org/grpc v1.47.0 google.golang.org/protobuf v1.28.0 @@ -89,11 +91,9 @@ require ( golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect @@ -101,3 +101,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) + +replace github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c => /Users/zulliangwang/code/ceres/ceresdbproto/golang diff --git a/go.sum b/go.sum index b4f80a2c..8e1103be 100644 --- a/go.sum +++ b/go.sum @@ -18,10 +18,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cd1b899..d0138981 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client * dispatch := eventdispatch.NewDispatchImpl() procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage) - schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType()) + schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetMaxConcurrentOpenShardBatchSize()) return &Cluster{ logger: logger, diff --git a/server/cluster/manager.go b/server/cluster/manager.go index 3696aa7e..1ae71156 100644 --- a/server/cluster/manager.go +++ b/server/cluster/manager.go @@ -112,14 +112,15 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt createTime := time.Now().UnixMilli() clusterMetadataStorage := storage.Cluster{ - ID: clusterID, - Name: clusterName, - MinNodeCount: opts.NodeCount, - ShardTotal: opts.ShardTotal, - EnableSchedule: opts.EnableSchedule, - TopologyType: opts.TopologyType, - CreatedAt: uint64(createTime), - ModifiedAt: uint64(createTime), + ID: clusterID, + Name: clusterName, + MinNodeCount: opts.NodeCount, + ShardTotal: opts.ShardTotal, + EnableSchedule: opts.EnableSchedule, + TopologyType: opts.TopologyType, + MaxConcurrentOpenShardBatchSize: opts.MaxConcurrentOpenShardBatchSize, + CreatedAt: uint64(createTime), + ModifiedAt: uint64(createTime), } err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{ Cluster: clusterMetadataStorage, @@ -164,14 +165,15 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt } err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{ - ID: c.GetMetadata().GetClusterID(), - Name: c.GetMetadata().Name(), - MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(), - ShardTotal: c.GetMetadata().GetTotalShardNum(), - EnableSchedule: opt.EnableSchedule, - TopologyType: opt.TopologyType, - CreatedAt: c.GetMetadata().GetCreateTime(), - ModifiedAt: uint64(time.Now().UnixMilli()), + ID: c.GetMetadata().GetClusterID(), + Name: c.GetMetadata().Name(), + MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(), + ShardTotal: c.GetMetadata().GetTotalShardNum(), + EnableSchedule: opt.EnableSchedule, + TopologyType: opt.TopologyType, + MaxConcurrentOpenShardBatchSize: opt.MaxConcurrentOpenShardBatchSize, + CreatedAt: c.GetMetadata().GetCreateTime(), + ModifiedAt: uint64(time.Now().UnixMilli()), }}) if err != nil { log.Error("update cluster", zap.Error(err)) @@ -333,14 +335,15 @@ func (m *managerImpl) Start(ctx context.Context) error { if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown { req := storage.UpdateClusterRequest{ Cluster: storage.Cluster{ - ID: metadataStorage.ID, - Name: metadataStorage.Name, - MinNodeCount: metadataStorage.MinNodeCount, - ShardTotal: metadataStorage.ShardTotal, - EnableSchedule: metadataStorage.EnableSchedule, - TopologyType: m.topologyType, - CreatedAt: metadataStorage.CreatedAt, - ModifiedAt: uint64(time.Now().UnixMilli()), + ID: metadataStorage.ID, + Name: metadataStorage.Name, + MinNodeCount: metadataStorage.MinNodeCount, + ShardTotal: metadataStorage.ShardTotal, + EnableSchedule: metadataStorage.EnableSchedule, + TopologyType: m.topologyType, + MaxConcurrentOpenShardBatchSize: metadataStorage.MaxConcurrentOpenShardBatchSize, + CreatedAt: metadataStorage.CreatedAt, + ModifiedAt: uint64(time.Now().UnixMilli()), }, } if err := m.storage.UpdateCluster(ctx, req); err != nil { diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index 781f8d92..bb068ed2 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -576,6 +576,13 @@ func (c *ClusterMetadata) GetTopologyType() storage.TopologyType { return c.metaData.TopologyType } +func (c *ClusterMetadata) GetMaxConcurrentOpenShardBatchSize() uint32 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.metaData.MaxConcurrentOpenShardBatchSize +} + func (c *ClusterMetadata) GetCreateTime() uint64 { c.lock.RLock() defer c.lock.RUnlock() diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go index 08bd7531..06157b3c 100644 --- a/server/cluster/metadata/types.go +++ b/server/cluster/metadata/types.go @@ -47,16 +47,18 @@ type ShardNodeWithVersion struct { } type CreateClusterOpts struct { - NodeCount uint32 - ReplicationFactor uint32 - ShardTotal uint32 - EnableSchedule bool - TopologyType storage.TopologyType + NodeCount uint32 + ReplicationFactor uint32 + ShardTotal uint32 + EnableSchedule bool + TopologyType storage.TopologyType + MaxConcurrentOpenShardBatchSize uint32 } type UpdateClusterOpts struct { - EnableSchedule bool - TopologyType storage.TopologyType + EnableSchedule bool + TopologyType storage.TopologyType + MaxConcurrentOpenShardBatchSize uint32 } type CreateTableMetadataRequest struct { diff --git a/server/config/config.go b/server/config/config.go index 8ba4630f..73f3dd7e 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -5,6 +5,7 @@ package config import ( "flag" "fmt" + "math" "os" "strings" "time" @@ -52,7 +53,8 @@ const ( defaultClusterShardTotal = 8 enableSchedule = true // topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage. - defaultTopologyType = "static" + defaultTopologyType = "static" + defaultMaxConcurrentOpenShardBatchSize = math.MaxUint32 defaultHTTPPort = 8080 @@ -127,6 +129,8 @@ type Config struct { EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"` // TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster. TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"` + // MaxConcurrentOpenShardBatchSize determines the maximum number of shards in a single batch when opening shards concurrently. + MaxConcurrentOpenShardBatchSize uint32 `toml:"max-concurrent-open-shard-batch-size" env:"MAX-CONCURRENT_OPEN_SHARD_BATCH_SIZE"` ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"` PeerUrls string `toml:"peer-urls" env:"PEER_URLS"` @@ -296,6 +300,7 @@ func MakeConfigParser() (*Parser, error) { DefaultClusterShardTotal: defaultClusterShardTotal, EnableSchedule: enableSchedule, TopologyType: defaultTopologyType, + MaxConcurrentOpenShardBatchSize: defaultMaxConcurrentOpenShardBatchSize, HTTPPort: defaultHTTPPort, } diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go index dc120e36..dade36ec 100644 --- a/server/coordinator/procedure/test/common.go +++ b/server/coordinator/procedure/test/common.go @@ -6,6 +6,7 @@ import ( "context" "crypto/rand" "fmt" + "math" "math/big" "testing" "time" @@ -21,16 +22,17 @@ import ( ) const ( - TestTableName0 = "table0" - TestTableName1 = "table1" - TestSchemaName = "TestSchemaName" - TestRootPath = "/rootPath" - DefaultIDAllocatorStep = 20 - ClusterName = "ceresdbCluster1" - DefaultNodeCount = 2 - DefaultShardTotal = 4 - DefaultSchedulerOperator = true - DefaultTopologyType = "static" + TestTableName0 = "table0" + TestTableName1 = "table1" + TestSchemaName = "TestSchemaName" + TestRootPath = "/rootPath" + DefaultIDAllocatorStep = 20 + ClusterName = "ceresdbCluster1" + DefaultNodeCount = 2 + DefaultShardTotal = 4 + DefaultSchedulerOperator = true + DefaultTopologyType = "static" + DefaultMaxConcurrentOpenShardBatchSize = math.MaxUint32 ) type MockDispatch struct{} @@ -99,13 +101,14 @@ func InitEmptyCluster(ctx context.Context, t *testing.T) *cluster.Cluster { logger := zap.NewNop() clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{ - ID: 0, - Name: ClusterName, - MinNodeCount: DefaultNodeCount, - ShardTotal: DefaultShardTotal, - EnableSchedule: DefaultSchedulerOperator, - TopologyType: DefaultTopologyType, - CreatedAt: 0, + ID: 0, + Name: ClusterName, + MinNodeCount: DefaultNodeCount, + ShardTotal: DefaultShardTotal, + EnableSchedule: DefaultSchedulerOperator, + TopologyType: DefaultTopologyType, + MaxConcurrentOpenShardBatchSize: DefaultMaxConcurrentOpenShardBatchSize, + CreatedAt: 0, }, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep) err := clusterMetadata.Init(ctx) diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index 049fdf9c..1bf1d2a8 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -15,14 +15,16 @@ import ( // AssignShardScheduler used to assigning shards without nodes. type AssignShardScheduler struct { - factory *coordinator.Factory - nodePicker coordinator.NodePicker + factory *coordinator.Factory + nodePicker coordinator.NodePicker + maxConcurrentOpenShardBatchSize uint32 } -func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { +func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, maxConcurrentOpenShardBatchSize uint32) Scheduler { return &AssignShardScheduler{ - factory: factory, - nodePicker: nodePicker, + factory: factory, + nodePicker: nodePicker, + maxConcurrentOpenShardBatchSize: maxConcurrentOpenShardBatchSize, } } @@ -56,6 +58,9 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name)) + if len(procedures) >= int(a.maxConcurrentOpenShardBatchSize) { + break + } } if len(procedures) == 0 { diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go index c2269caa..1fd2aed6 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler_test.go +++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go @@ -19,7 +19,7 @@ func TestAssignSchedule(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index 66f032f6..35f49360 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -14,16 +14,18 @@ import ( ) type RebalancedShardScheduler struct { - logger *zap.Logger - factory *coordinator.Factory - nodePicker coordinator.NodePicker + logger *zap.Logger + factory *coordinator.Factory + nodePicker coordinator.NodePicker + maxConcurrentOpenShardBatchSize uint32 } -func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { +func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, maxConcurrentOpenShardBatchSize uint32) Scheduler { return &RebalancedShardScheduler{ - logger: logger, - factory: factory, - nodePicker: nodePicker, + logger: logger, + factory: factory, + nodePicker: nodePicker, + maxConcurrentOpenShardBatchSize: maxConcurrentOpenShardBatchSize, } } @@ -54,6 +56,9 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot } procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name)) + if len(procedures) >= int(r.maxConcurrentOpenShardBatchSize) { + break + } } } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go index d5014c64..ce926287 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go @@ -19,7 +19,7 @@ func TestRebalancedScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index c3312c69..e4a8ea22 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -52,15 +52,16 @@ type ManagerImpl struct { rootPath string // This lock is used to protect the following field. - lock sync.RWMutex - registerSchedulers []Scheduler - shardWatch watch.ShardWatch - isRunning atomic.Bool - enableSchedule bool - topologyType storage.TopologyType + lock sync.RWMutex + registerSchedulers []Scheduler + shardWatch watch.ShardWatch + isRunning atomic.Bool + enableSchedule bool + topologyType storage.TopologyType + maxConcurrentOpenShardBatchSize uint32 } -func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType) Manager { +func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, maxConcurrentOpenShardBatchSize uint32) Manager { var shardWatch watch.ShardWatch switch topologyType { case storage.TopologyTypeDynamic: @@ -71,17 +72,18 @@ func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory } return &ManagerImpl{ - procedureManager: procedureManager, - registerSchedulers: []Scheduler{}, - factory: factory, - nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas), - clusterMetadata: clusterMetadata, - client: client, - shardWatch: shardWatch, - rootPath: rootPath, - enableSchedule: enableSchedule, - topologyType: topologyType, - logger: logger, + procedureManager: procedureManager, + registerSchedulers: []Scheduler{}, + factory: factory, + nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas), + clusterMetadata: clusterMetadata, + client: client, + shardWatch: shardWatch, + rootPath: rootPath, + enableSchedule: enableSchedule, + topologyType: topologyType, + maxConcurrentOpenShardBatchSize: maxConcurrentOpenShardBatchSize, + logger: logger, } } @@ -189,13 +191,13 @@ func (m *ManagerImpl) initRegister() { } func (m *ManagerImpl) createStaticTopologySchedulers() []Scheduler { - staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker) + staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker, m.maxConcurrentOpenShardBatchSize) return []Scheduler{staticTopologyShardScheduler} } func (m *ManagerImpl) createDynamicTopologySchedulers() []Scheduler { - assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker) - rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker) + assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker, m.maxConcurrentOpenShardBatchSize) + rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker, m.maxConcurrentOpenShardBatchSize) return []Scheduler{assignShardScheduler, rebalancedShardScheduler} } diff --git a/server/coordinator/scheduler/scheduler_manager_test.go b/server/coordinator/scheduler/scheduler_manager_test.go index d2dec5cd..a9df691a 100644 --- a/server/coordinator/scheduler/scheduler_manager_test.go +++ b/server/coordinator/scheduler/scheduler_manager_test.go @@ -31,14 +31,14 @@ func TestSchedulerManager(t *testing.T) { _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) // Create scheduler manager with enableScheduler equal to false. - schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic) + schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) err = schedulerManager.Stop(ctx) re.NoError(err) // Create scheduler manager with static topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic) + schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers := schedulerManager.ListScheduler() @@ -47,7 +47,7 @@ func TestSchedulerManager(t *testing.T) { re.NoError(err) // Create scheduler manager with dynamic topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic) + schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers = schedulerManager.ListScheduler() diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index 636f1a73..14ab0635 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -16,12 +16,13 @@ import ( ) type StaticTopologyShardScheduler struct { - factory *coordinator.Factory - nodePicker coordinator.NodePicker + factory *coordinator.Factory + nodePicker coordinator.NodePicker + maxConcurrentOpenShardBatchSize uint32 } -func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { - return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker} +func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, maxConcurrentOpenShardBatchSize uint32) Scheduler { + return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, maxConcurrentOpenShardBatchSize: maxConcurrentOpenShardBatchSize} } func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { @@ -54,6 +55,9 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap } procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name)) + if len(procedures) >= int(s.maxConcurrentOpenShardBatchSize) { + break + } } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -75,6 +79,9 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap } procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name)) + if len(procedures) >= int(s.maxConcurrentOpenShardBatchSize) { + break + } } } } diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go index 47018623..c76c997a 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go @@ -19,7 +19,7 @@ func TestStaticTopologyScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/server.go b/server/server.go index c77bf62d..893d8a2f 100644 --- a/server/server.go +++ b/server/server.go @@ -237,11 +237,12 @@ func (srv *Server) createDefaultCluster(ctx context.Context) error { } defaultCluster, err := srv.clusterManager.CreateCluster(ctx, srv.cfg.DefaultClusterName, metadata.CreateClusterOpts{ - NodeCount: uint32(srv.cfg.DefaultClusterNodeCount), - ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor), - ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal), - EnableSchedule: srv.cfg.EnableSchedule, - TopologyType: topologyType, + NodeCount: uint32(srv.cfg.DefaultClusterNodeCount), + ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor), + ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal), + EnableSchedule: srv.cfg.EnableSchedule, + TopologyType: topologyType, + MaxConcurrentOpenShardBatchSize: srv.cfg.MaxConcurrentOpenShardBatchSize, }) if err != nil { log.Warn("create default cluster failed", zap.Error(err)) diff --git a/server/service/http/api.go b/server/service/http/api.go index 6eb4aaaa..51c75a09 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -542,10 +542,11 @@ func (a *API) createCluster(writer http.ResponseWriter, req *http.Request) { } type UpdateClusterRequest struct { - NodeCount uint32 `json:"NodeCount"` - ShardTotal uint32 `json:"ShardTotal"` - EnableSchedule bool `json:"enableSchedule"` - TopologyType string `json:"topologyType"` + NodeCount uint32 `json:"nodeCount"` + ShardTotal uint32 `json:"shardTotal"` + EnableSchedule bool `json:"enableSchedule"` + TopologyType string `json:"topologyType"` + MaxConcurrentOpenShardBatchSize uint32 `json:"maxConcurrentOpenShardBatchSize"` } func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) { @@ -590,8 +591,9 @@ func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) { } if err := a.clusterManager.UpdateCluster(req.Context(), clusterName, metadata.UpdateClusterOpts{ - EnableSchedule: updateClusterRequest.EnableSchedule, - TopologyType: topologyType, + EnableSchedule: updateClusterRequest.EnableSchedule, + TopologyType: topologyType, + MaxConcurrentOpenShardBatchSize: updateClusterRequest.MaxConcurrentOpenShardBatchSize, }); err != nil { log.Error("update cluster failed", zap.Error(err)) a.respondError(writer, metadata.ErrUpdateCluster, fmt.Sprintf("err: %s", err.Error())) diff --git a/server/storage/types.go b/server/storage/types.go index a80cb826..5b23e3b8 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -152,12 +152,13 @@ type Cluster struct { Name string MinNodeCount uint32 // Deprecated: ReplicationFactor is deprecated after CeresMeta v1.2.0 - ReplicationFactor uint32 - ShardTotal uint32 - EnableSchedule bool - TopologyType TopologyType - CreatedAt uint64 - ModifiedAt uint64 + ReplicationFactor uint32 + ShardTotal uint32 + EnableSchedule bool + TopologyType TopologyType + MaxConcurrentOpenShardBatchSize uint32 + CreatedAt uint64 + ModifiedAt uint64 } type ShardNode struct { @@ -258,27 +259,29 @@ func convertNodeToPB(node Node) clusterpb.Node { func convertClusterPB(cluster *clusterpb.Cluster) Cluster { return Cluster{ - ID: ClusterID(cluster.Id), - Name: cluster.Name, - MinNodeCount: cluster.MinNodeCount, - ShardTotal: cluster.ShardTotal, - EnableSchedule: cluster.EnableSchedule, - TopologyType: convertTopologyTypePB(cluster.TopologyType), - CreatedAt: cluster.CreatedAt, - ModifiedAt: cluster.ModifiedAt, + ID: ClusterID(cluster.Id), + Name: cluster.Name, + MinNodeCount: cluster.MinNodeCount, + ShardTotal: cluster.ShardTotal, + EnableSchedule: cluster.EnableSchedule, + TopologyType: convertTopologyTypePB(cluster.TopologyType), + MaxConcurrentOpenShardBatchSize: cluster.MaxConcurrentOpenShardBatchSize, + CreatedAt: cluster.CreatedAt, + ModifiedAt: cluster.ModifiedAt, } } func convertClusterToPB(cluster Cluster) clusterpb.Cluster { return clusterpb.Cluster{ - Id: uint32(cluster.ID), - Name: cluster.Name, - MinNodeCount: cluster.MinNodeCount, - ShardTotal: cluster.ShardTotal, - EnableSchedule: cluster.EnableSchedule, - TopologyType: convertTopologyTypeToPB(cluster.TopologyType), - CreatedAt: cluster.CreatedAt, - ModifiedAt: cluster.ModifiedAt, + Id: uint32(cluster.ID), + Name: cluster.Name, + MinNodeCount: cluster.MinNodeCount, + ShardTotal: cluster.ShardTotal, + EnableSchedule: cluster.EnableSchedule, + TopologyType: convertTopologyTypeToPB(cluster.TopologyType), + MaxConcurrentOpenShardBatchSize: cluster.MaxConcurrentOpenShardBatchSize, + CreatedAt: cluster.CreatedAt, + ModifiedAt: cluster.ModifiedAt, } }