diff --git a/go.mod b/go.mod index 3a537796..181a8825 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/AlekSi/gocov-xml v1.0.0 - github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c + github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c github.com/axw/gocov v1.1.0 github.com/caarlos0/env/v6 v6.10.1 github.com/julienschmidt/httprouter v1.3.0 @@ -18,6 +18,8 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 go.uber.org/zap v1.21.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/tools v0.1.10 google.golang.org/grpc v1.47.0 google.golang.org/protobuf v1.28.0 @@ -83,17 +85,15 @@ require ( go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect - go.opentelemetry.io/proto/otlp v0.7.0 // indirect + go.opentelemetry.io/proto/otlp v0.7.0 // indirectmake go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index b4f80a2c..f72d4375 100644 --- a/go.sum +++ b/go.sum @@ -18,10 +18,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707015251-b771ff78a281 h1:F0pmf85TCDi6/UJoika1eYGDLuJSdEGvOKMb7Ha3u3s= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707015251-b771ff78a281/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c h1:7gmSQsGua+Y1g6ygsC/K75T/zK2ki7y5R5BkrN1/Ymc= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cd1b899..d08ac03f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client * dispatch := eventdispatch.NewDispatchImpl() procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage) - schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType()) + schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize()) return &Cluster{ logger: logger, diff --git a/server/cluster/manager.go b/server/cluster/manager.go index 3696aa7e..fea613da 100644 --- a/server/cluster/manager.go +++ b/server/cluster/manager.go @@ -112,14 +112,15 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt createTime := time.Now().UnixMilli() clusterMetadataStorage := storage.Cluster{ - ID: clusterID, - Name: clusterName, - MinNodeCount: opts.NodeCount, - ShardTotal: opts.ShardTotal, - EnableSchedule: opts.EnableSchedule, - TopologyType: opts.TopologyType, - CreatedAt: uint64(createTime), - ModifiedAt: uint64(createTime), + ID: clusterID, + Name: clusterName, + MinNodeCount: opts.NodeCount, + ShardTotal: opts.ShardTotal, + EnableSchedule: opts.EnableSchedule, + TopologyType: opts.TopologyType, + ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize, + CreatedAt: uint64(createTime), + ModifiedAt: uint64(createTime), } err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{ Cluster: clusterMetadataStorage, @@ -164,14 +165,15 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt } err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{ - ID: c.GetMetadata().GetClusterID(), - Name: c.GetMetadata().Name(), - MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(), - ShardTotal: c.GetMetadata().GetTotalShardNum(), - EnableSchedule: opt.EnableSchedule, - TopologyType: opt.TopologyType, - CreatedAt: c.GetMetadata().GetCreateTime(), - ModifiedAt: uint64(time.Now().UnixMilli()), + ID: c.GetMetadata().GetClusterID(), + Name: c.GetMetadata().Name(), + MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(), + ShardTotal: c.GetMetadata().GetTotalShardNum(), + EnableSchedule: opt.EnableSchedule, + TopologyType: opt.TopologyType, + ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize, + CreatedAt: c.GetMetadata().GetCreateTime(), + ModifiedAt: uint64(time.Now().UnixMilli()), }}) if err != nil { log.Error("update cluster", zap.Error(err)) @@ -333,14 +335,15 @@ func (m *managerImpl) Start(ctx context.Context) error { if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown { req := storage.UpdateClusterRequest{ Cluster: storage.Cluster{ - ID: metadataStorage.ID, - Name: metadataStorage.Name, - MinNodeCount: metadataStorage.MinNodeCount, - ShardTotal: metadataStorage.ShardTotal, - EnableSchedule: metadataStorage.EnableSchedule, - TopologyType: m.topologyType, - CreatedAt: metadataStorage.CreatedAt, - ModifiedAt: uint64(time.Now().UnixMilli()), + ID: metadataStorage.ID, + Name: metadataStorage.Name, + MinNodeCount: metadataStorage.MinNodeCount, + ShardTotal: metadataStorage.ShardTotal, + EnableSchedule: metadataStorage.EnableSchedule, + TopologyType: m.topologyType, + ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize, + CreatedAt: metadataStorage.CreatedAt, + ModifiedAt: uint64(time.Now().UnixMilli()), }, } if err := m.storage.UpdateCluster(ctx, req); err != nil { diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index 781f8d92..112f98aa 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -576,6 +576,13 @@ func (c *ClusterMetadata) GetTopologyType() storage.TopologyType { return c.metaData.TopologyType } +func (c *ClusterMetadata) GetProcedureExecutingBatchSize() uint32 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.metaData.ProcedureExecutingBatchSize +} + func (c *ClusterMetadata) GetCreateTime() uint64 { c.lock.RLock() defer c.lock.RUnlock() diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go index 08bd7531..bcac4210 100644 --- a/server/cluster/metadata/types.go +++ b/server/cluster/metadata/types.go @@ -47,16 +47,18 @@ type ShardNodeWithVersion struct { } type CreateClusterOpts struct { - NodeCount uint32 - ReplicationFactor uint32 - ShardTotal uint32 - EnableSchedule bool - TopologyType storage.TopologyType + NodeCount uint32 + ReplicationFactor uint32 + ShardTotal uint32 + EnableSchedule bool + TopologyType storage.TopologyType + ProcedureExecutingBatchSize uint32 } type UpdateClusterOpts struct { - EnableSchedule bool - TopologyType storage.TopologyType + EnableSchedule bool + TopologyType storage.TopologyType + ProcedureExecutingBatchSize uint32 } type CreateTableMetadataRequest struct { diff --git a/server/config/config.go b/server/config/config.go index 8ba4630f..f7281658 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -5,6 +5,7 @@ package config import ( "flag" "fmt" + "math" "os" "strings" "time" @@ -52,7 +53,8 @@ const ( defaultClusterShardTotal = 8 enableSchedule = true // topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage. - defaultTopologyType = "static" + defaultTopologyType = "static" + defaultProcedureExecutingBatchSize = math.MaxUint32 defaultHTTPPort = 8080 @@ -127,6 +129,8 @@ type Config struct { EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"` // TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster. TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"` + // ProcedureExecutingBatchSize determines the maximum number of shards in a single batch when opening shards concurrently. + ProcedureExecutingBatchSize uint32 `toml:"procedure-executing-batch-size" env:"PROCEDURE_EXECUTING_BATCH_SIZE"` ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"` PeerUrls string `toml:"peer-urls" env:"PEER_URLS"` @@ -296,6 +300,7 @@ func MakeConfigParser() (*Parser, error) { DefaultClusterShardTotal: defaultClusterShardTotal, EnableSchedule: enableSchedule, TopologyType: defaultTopologyType, + ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize, HTTPPort: defaultHTTPPort, } diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go index ff37a82c..d60437d1 100644 --- a/server/coordinator/procedure/manager_impl.go +++ b/server/coordinator/procedure/manager_impl.go @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error { return nil } +// TODO: Filter duplicate submitted Procedure. func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error { if err := m.waitingProcedures.Push(procedure, 0); err != nil { return err diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go index 4aa4f54f..791ece36 100644 --- a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go @@ -63,7 +63,7 @@ func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.Relate for shardID, version := range p.RelatedVersionInfo().ShardWithVersion { if resultVersion, exists := result.ShardWithVersion[shardID]; exists { if version != resultVersion { - return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent") + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID: %d, expetcdShardVersion: %d, shardVersion: %d", shardID, version, resultVersion)) } } else { result.ShardWithVersion[shardID] = version diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go index ba72b03b..4838ccbd 100644 --- a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go @@ -53,34 +53,23 @@ func (m mockProcedure) Priority() procedure.Priority { func TestBatchProcedure(t *testing.T) { re := require.New(t) - var procedures []procedure.Procedure // Procedures with same type and version. for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(i)] = 0 - p := mockProcedure{ - ClusterID: 0, - clusterVersion: 0, - typ: 0, - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion) procedures = append(procedures, p) } _, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures) re.NoError(err) - // Procedure with different clusterID + // Procedure with different clusterID. for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(i)] = 0 - p := mockProcedure{ - ClusterID: storage.ClusterID(i), - clusterVersion: 0, - typ: procedure.TransferLeader, - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion) procedures = append(procedures, p) } _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) @@ -90,12 +79,7 @@ func TestBatchProcedure(t *testing.T) { for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(i)] = 0 - p := mockProcedure{ - ClusterID: 0, - clusterVersion: 0, - typ: procedure.Typ(i), - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion) procedures = append(procedures, p) } _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) @@ -105,14 +89,18 @@ func TestBatchProcedure(t *testing.T) { for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(0)] = uint64(i) - p := mockProcedure{ - ClusterID: 0, - clusterVersion: 0, - typ: procedure.Typ(i), - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion) procedures = append(procedures, p) } _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) re.Error(err) } + +func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure { + return mockProcedure{ + ClusterID: clusterID, + clusterVersion: clusterVersion, + typ: typ, + ShardWithVersion: shardWithVersion, + } +} diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go index dc120e36..b267d846 100644 --- a/server/coordinator/procedure/test/common.go +++ b/server/coordinator/procedure/test/common.go @@ -6,6 +6,7 @@ import ( "context" "crypto/rand" "fmt" + "math" "math/big" "testing" "time" @@ -21,16 +22,17 @@ import ( ) const ( - TestTableName0 = "table0" - TestTableName1 = "table1" - TestSchemaName = "TestSchemaName" - TestRootPath = "/rootPath" - DefaultIDAllocatorStep = 20 - ClusterName = "ceresdbCluster1" - DefaultNodeCount = 2 - DefaultShardTotal = 4 - DefaultSchedulerOperator = true - DefaultTopologyType = "static" + TestTableName0 = "table0" + TestTableName1 = "table1" + TestSchemaName = "TestSchemaName" + TestRootPath = "/rootPath" + DefaultIDAllocatorStep = 20 + ClusterName = "ceresdbCluster1" + DefaultNodeCount = 2 + DefaultShardTotal = 4 + DefaultSchedulerOperator = true + DefaultTopologyType = "static" + DefaultProcedureExecutingBatchSize = math.MaxUint32 ) type MockDispatch struct{} @@ -99,13 +101,14 @@ func InitEmptyCluster(ctx context.Context, t *testing.T) *cluster.Cluster { logger := zap.NewNop() clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{ - ID: 0, - Name: ClusterName, - MinNodeCount: DefaultNodeCount, - ShardTotal: DefaultShardTotal, - EnableSchedule: DefaultSchedulerOperator, - TopologyType: DefaultTopologyType, - CreatedAt: 0, + ID: 0, + Name: ClusterName, + MinNodeCount: DefaultNodeCount, + ShardTotal: DefaultShardTotal, + EnableSchedule: DefaultSchedulerOperator, + TopologyType: DefaultTopologyType, + ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize, + CreatedAt: 0, }, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep) err := clusterMetadata.Init(ctx) diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index 1856a643..9d1ebdeb 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -15,14 +15,16 @@ import ( // AssignShardScheduler used to assigning shards without nodes. type AssignShardScheduler struct { - factory *coordinator.Factory - nodePicker coordinator.NodePicker + factory *coordinator.Factory + nodePicker coordinator.NodePicker + procedureExecutingBatchSize uint32 } -func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { +func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { return &AssignShardScheduler{ - factory: factory, - nodePicker: nodePicker, + factory: factory, + nodePicker: nodePicker, + procedureExecutingBatchSize: procedureExecutingBatchSize, } } @@ -55,7 +57,10 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name)) + reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name)) + if len(procedures) >= int(a.procedureExecutingBatchSize) { + break + } } if len(procedures) == 0 { diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go index c2269caa..1fd2aed6 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler_test.go +++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go @@ -19,7 +19,7 @@ func TestAssignSchedule(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index c4095747..63b70feb 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -14,16 +14,18 @@ import ( ) type RebalancedShardScheduler struct { - logger *zap.Logger - factory *coordinator.Factory - nodePicker coordinator.NodePicker + logger *zap.Logger + factory *coordinator.Factory + nodePicker coordinator.NodePicker + procedureExecutingBatchSize uint32 } -func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { +func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { return &RebalancedShardScheduler{ - logger: logger, - factory: factory, - nodePicker: nodePicker, + logger: logger, + factory: factory, + nodePicker: nodePicker, + procedureExecutingBatchSize: procedureExecutingBatchSize, } } @@ -53,7 +55,10 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name)) + reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name)) + if len(procedures) >= int(r.procedureExecutingBatchSize) { + break + } } } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go index d5014c64..ce926287 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go @@ -19,7 +19,7 @@ func TestRebalancedScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index 833d48a3..509da095 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -21,7 +21,7 @@ import ( ) const ( - schedulerInterval = time.Second * 5 + schedulerInterval = time.Second * 15 defaultHashReplicas = 50 ) @@ -52,15 +52,16 @@ type ManagerImpl struct { rootPath string // This lock is used to protect the following field. - lock sync.RWMutex - registerSchedulers []Scheduler - shardWatch watch.ShardWatch - isRunning atomic.Bool - enableSchedule bool - topologyType storage.TopologyType + lock sync.RWMutex + registerSchedulers []Scheduler + shardWatch watch.ShardWatch + isRunning atomic.Bool + enableSchedule bool + topologyType storage.TopologyType + procedureExecutingBatchSize uint32 } -func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType) Manager { +func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager { var shardWatch watch.ShardWatch switch topologyType { case storage.TopologyTypeDynamic: @@ -71,17 +72,18 @@ func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory } return &ManagerImpl{ - procedureManager: procedureManager, - registerSchedulers: []Scheduler{}, - factory: factory, - nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas), - clusterMetadata: clusterMetadata, - client: client, - shardWatch: shardWatch, - rootPath: rootPath, - enableSchedule: enableSchedule, - topologyType: topologyType, - logger: logger, + procedureManager: procedureManager, + registerSchedulers: []Scheduler{}, + factory: factory, + nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas), + clusterMetadata: clusterMetadata, + client: client, + shardWatch: shardWatch, + rootPath: rootPath, + enableSchedule: enableSchedule, + topologyType: topologyType, + procedureExecutingBatchSize: procedureExecutingBatchSize, + logger: logger, } } @@ -189,13 +191,13 @@ func (m *ManagerImpl) initRegister() { } func (m *ManagerImpl) createStaticTopologySchedulers() []Scheduler { - staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker) + staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize) return []Scheduler{staticTopologyShardScheduler} } func (m *ManagerImpl) createDynamicTopologySchedulers() []Scheduler { - assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker) - rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker) + assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize) + rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker, m.procedureExecutingBatchSize) return []Scheduler{assignShardScheduler, rebalancedShardScheduler} } @@ -213,16 +215,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler { func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. - allResults := make([]ScheduleResult, 0, len(m.registerSchedulers)) + results := make([]ScheduleResult, 0, len(m.registerSchedulers)) for _, scheduler := range m.registerSchedulers { result, err := scheduler.Schedule(ctx, clusterSnapshot) if err != nil { m.logger.Error("scheduler failed", zap.Error(err)) continue } - allResults = append(allResults, result) + results = append(results, result) } - return allResults + return results } func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { diff --git a/server/coordinator/scheduler/scheduler_manager_test.go b/server/coordinator/scheduler/scheduler_manager_test.go index d2dec5cd..a9df691a 100644 --- a/server/coordinator/scheduler/scheduler_manager_test.go +++ b/server/coordinator/scheduler/scheduler_manager_test.go @@ -31,14 +31,14 @@ func TestSchedulerManager(t *testing.T) { _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) // Create scheduler manager with enableScheduler equal to false. - schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic) + schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) err = schedulerManager.Stop(ctx) re.NoError(err) // Create scheduler manager with static topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic) + schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers := schedulerManager.ListScheduler() @@ -47,7 +47,7 @@ func TestSchedulerManager(t *testing.T) { re.NoError(err) // Create scheduler manager with dynamic topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic) + schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers = schedulerManager.ListScheduler() diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index 2b9105e8..ff4b08bd 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -16,12 +16,13 @@ import ( ) type StaticTopologyShardScheduler struct { - factory *coordinator.Factory - nodePicker coordinator.NodePicker + factory *coordinator.Factory + nodePicker coordinator.NodePicker + procedureExecutingBatchSize uint32 } -func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { - return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker} +func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { + return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize} } func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { @@ -53,7 +54,10 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name)) + reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name)) + if len(procedures) >= int(s.procedureExecutingBatchSize) { + break + } } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -74,7 +78,10 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name)) + reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name)) + if len(procedures) >= int(s.procedureExecutingBatchSize) { + break + } } } } diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go index 47018623..c76c997a 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go @@ -19,7 +19,7 @@ func TestStaticTopologyScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) diff --git a/server/server.go b/server/server.go index c77bf62d..051661cf 100644 --- a/server/server.go +++ b/server/server.go @@ -237,11 +237,12 @@ func (srv *Server) createDefaultCluster(ctx context.Context) error { } defaultCluster, err := srv.clusterManager.CreateCluster(ctx, srv.cfg.DefaultClusterName, metadata.CreateClusterOpts{ - NodeCount: uint32(srv.cfg.DefaultClusterNodeCount), - ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor), - ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal), - EnableSchedule: srv.cfg.EnableSchedule, - TopologyType: topologyType, + NodeCount: uint32(srv.cfg.DefaultClusterNodeCount), + ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor), + ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal), + EnableSchedule: srv.cfg.EnableSchedule, + TopologyType: topologyType, + ProcedureExecutingBatchSize: srv.cfg.ProcedureExecutingBatchSize, }) if err != nil { log.Warn("create default cluster failed", zap.Error(err)) diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 94945f1f..692ffd62 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -297,7 +297,7 @@ func (s *Service) RouteTables(ctx context.Context, req *metaservicepb.RouteTable return &metaservicepb.RouteTablesResponse{Header: responseHeader(err, "grpc routeTables")}, nil } - log.Info("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ","))) + log.Debug("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ","))) // Forward request to the leader. if ceresmetaClient != nil { diff --git a/server/service/http/api.go b/server/service/http/api.go index 6eb4aaaa..cd2ed99e 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -542,10 +542,11 @@ func (a *API) createCluster(writer http.ResponseWriter, req *http.Request) { } type UpdateClusterRequest struct { - NodeCount uint32 `json:"NodeCount"` - ShardTotal uint32 `json:"ShardTotal"` - EnableSchedule bool `json:"enableSchedule"` - TopologyType string `json:"topologyType"` + NodeCount uint32 `json:"nodeCount"` + ShardTotal uint32 `json:"shardTotal"` + EnableSchedule bool `json:"enableSchedule"` + TopologyType string `json:"topologyType"` + ProcedureExecutingBatchSize uint32 `json:"procedureExecutingBatchSize"` } func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) { @@ -590,8 +591,9 @@ func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) { } if err := a.clusterManager.UpdateCluster(req.Context(), clusterName, metadata.UpdateClusterOpts{ - EnableSchedule: updateClusterRequest.EnableSchedule, - TopologyType: topologyType, + EnableSchedule: updateClusterRequest.EnableSchedule, + TopologyType: topologyType, + ProcedureExecutingBatchSize: updateClusterRequest.ProcedureExecutingBatchSize, }); err != nil { log.Error("update cluster failed", zap.Error(err)) a.respondError(writer, metadata.ErrUpdateCluster, fmt.Sprintf("err: %s", err.Error())) diff --git a/server/storage/types.go b/server/storage/types.go index a80cb826..02ef7857 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -152,12 +152,13 @@ type Cluster struct { Name string MinNodeCount uint32 // Deprecated: ReplicationFactor is deprecated after CeresMeta v1.2.0 - ReplicationFactor uint32 - ShardTotal uint32 - EnableSchedule bool - TopologyType TopologyType - CreatedAt uint64 - ModifiedAt uint64 + ReplicationFactor uint32 + ShardTotal uint32 + EnableSchedule bool + TopologyType TopologyType + ProcedureExecutingBatchSize uint32 + CreatedAt uint64 + ModifiedAt uint64 } type ShardNode struct { @@ -258,27 +259,29 @@ func convertNodeToPB(node Node) clusterpb.Node { func convertClusterPB(cluster *clusterpb.Cluster) Cluster { return Cluster{ - ID: ClusterID(cluster.Id), - Name: cluster.Name, - MinNodeCount: cluster.MinNodeCount, - ShardTotal: cluster.ShardTotal, - EnableSchedule: cluster.EnableSchedule, - TopologyType: convertTopologyTypePB(cluster.TopologyType), - CreatedAt: cluster.CreatedAt, - ModifiedAt: cluster.ModifiedAt, + ID: ClusterID(cluster.Id), + Name: cluster.Name, + MinNodeCount: cluster.MinNodeCount, + ShardTotal: cluster.ShardTotal, + EnableSchedule: cluster.EnableSchedule, + TopologyType: convertTopologyTypePB(cluster.TopologyType), + ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize, + CreatedAt: cluster.CreatedAt, + ModifiedAt: cluster.ModifiedAt, } } func convertClusterToPB(cluster Cluster) clusterpb.Cluster { return clusterpb.Cluster{ - Id: uint32(cluster.ID), - Name: cluster.Name, - MinNodeCount: cluster.MinNodeCount, - ShardTotal: cluster.ShardTotal, - EnableSchedule: cluster.EnableSchedule, - TopologyType: convertTopologyTypeToPB(cluster.TopologyType), - CreatedAt: cluster.CreatedAt, - ModifiedAt: cluster.ModifiedAt, + Id: uint32(cluster.ID), + Name: cluster.Name, + MinNodeCount: cluster.MinNodeCount, + ShardTotal: cluster.ShardTotal, + EnableSchedule: cluster.EnableSchedule, + TopologyType: convertTopologyTypeToPB(cluster.TopologyType), + ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize, + CreatedAt: cluster.CreatedAt, + ModifiedAt: cluster.ModifiedAt, } }