diff --git a/go.mod b/go.mod index 3a537796..85305f71 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/AlekSi/gocov-xml v1.0.0 - github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c + github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c github.com/axw/gocov v1.1.0 github.com/caarlos0/env/v6 v6.10.1 github.com/julienschmidt/httprouter v1.3.0 @@ -18,6 +18,8 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 go.uber.org/zap v1.21.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/tools v0.1.10 google.golang.org/grpc v1.47.0 google.golang.org/protobuf v1.28.0 @@ -89,11 +91,9 @@ require ( golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index b4f80a2c..adf12360 100644 --- a/go.sum +++ b/go.sum @@ -18,10 +18,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c h1:7gmSQsGua+Y1g6ygsC/K75T/zK2ki7y5R5BkrN1/Ymc= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cd1b899..d08ac03f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client * dispatch := eventdispatch.NewDispatchImpl() procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage) - schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType()) + schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize()) return &Cluster{ logger: logger, diff --git a/server/cluster/manager.go b/server/cluster/manager.go index 3696aa7e..fea613da 100644 --- a/server/cluster/manager.go +++ b/server/cluster/manager.go @@ -112,14 +112,15 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt createTime := time.Now().UnixMilli() clusterMetadataStorage := storage.Cluster{ - ID: clusterID, - Name: clusterName, - MinNodeCount: opts.NodeCount, - ShardTotal: opts.ShardTotal, - EnableSchedule: opts.EnableSchedule, - TopologyType: opts.TopologyType, - CreatedAt: uint64(createTime), - ModifiedAt: uint64(createTime), + ID: clusterID, + Name: clusterName, + MinNodeCount: opts.NodeCount, + ShardTotal: opts.ShardTotal, + EnableSchedule: opts.EnableSchedule, + TopologyType: opts.TopologyType, + ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize, + CreatedAt: uint64(createTime), + ModifiedAt: uint64(createTime), } err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{ Cluster: clusterMetadataStorage, @@ -164,14 +165,15 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt } err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{ - ID: c.GetMetadata().GetClusterID(), - Name: c.GetMetadata().Name(), - MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(), - ShardTotal: c.GetMetadata().GetTotalShardNum(), - EnableSchedule: opt.EnableSchedule, - TopologyType: opt.TopologyType, - CreatedAt: c.GetMetadata().GetCreateTime(), - ModifiedAt: uint64(time.Now().UnixMilli()), + ID: c.GetMetadata().GetClusterID(), + Name: c.GetMetadata().Name(), + MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(), + ShardTotal: c.GetMetadata().GetTotalShardNum(), + EnableSchedule: opt.EnableSchedule, + TopologyType: opt.TopologyType, + ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize, + CreatedAt: c.GetMetadata().GetCreateTime(), + ModifiedAt: uint64(time.Now().UnixMilli()), }}) if err != nil { log.Error("update cluster", zap.Error(err)) @@ -333,14 +335,15 @@ func (m *managerImpl) Start(ctx context.Context) error { if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown { req := storage.UpdateClusterRequest{ Cluster: storage.Cluster{ - ID: metadataStorage.ID, - Name: metadataStorage.Name, - MinNodeCount: metadataStorage.MinNodeCount, - ShardTotal: metadataStorage.ShardTotal, - EnableSchedule: metadataStorage.EnableSchedule, - TopologyType: m.topologyType, - CreatedAt: metadataStorage.CreatedAt, - ModifiedAt: uint64(time.Now().UnixMilli()), + ID: metadataStorage.ID, + Name: metadataStorage.Name, + MinNodeCount: metadataStorage.MinNodeCount, + ShardTotal: metadataStorage.ShardTotal, + EnableSchedule: metadataStorage.EnableSchedule, + TopologyType: m.topologyType, + ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize, + CreatedAt: metadataStorage.CreatedAt, + ModifiedAt: uint64(time.Now().UnixMilli()), }, } if err := m.storage.UpdateCluster(ctx, req); err != nil { diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index 781f8d92..112f98aa 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -576,6 +576,13 @@ func (c *ClusterMetadata) GetTopologyType() storage.TopologyType { return c.metaData.TopologyType } +func (c *ClusterMetadata) GetProcedureExecutingBatchSize() uint32 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.metaData.ProcedureExecutingBatchSize +} + func (c *ClusterMetadata) GetCreateTime() uint64 { c.lock.RLock() defer c.lock.RUnlock() diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go index 08bd7531..bcac4210 100644 --- a/server/cluster/metadata/types.go +++ b/server/cluster/metadata/types.go @@ -47,16 +47,18 @@ type ShardNodeWithVersion struct { } type CreateClusterOpts struct { - NodeCount uint32 - ReplicationFactor uint32 - ShardTotal uint32 - EnableSchedule bool - TopologyType storage.TopologyType + NodeCount uint32 + ReplicationFactor uint32 + ShardTotal uint32 + EnableSchedule bool + TopologyType storage.TopologyType + ProcedureExecutingBatchSize uint32 } type UpdateClusterOpts struct { - EnableSchedule bool - TopologyType storage.TopologyType + EnableSchedule bool + TopologyType storage.TopologyType + ProcedureExecutingBatchSize uint32 } type CreateTableMetadataRequest struct { diff --git a/server/config/config.go b/server/config/config.go index 8ba4630f..f7281658 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -5,6 +5,7 @@ package config import ( "flag" "fmt" + "math" "os" "strings" "time" @@ -52,7 +53,8 @@ const ( defaultClusterShardTotal = 8 enableSchedule = true // topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage. - defaultTopologyType = "static" + defaultTopologyType = "static" + defaultProcedureExecutingBatchSize = math.MaxUint32 defaultHTTPPort = 8080 @@ -127,6 +129,8 @@ type Config struct { EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"` // TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster. TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"` + // ProcedureExecutingBatchSize determines the maximum number of shards in a single batch when opening shards concurrently. + ProcedureExecutingBatchSize uint32 `toml:"procedure-executing-batch-size" env:"PROCEDURE_EXECUTING_BATCH_SIZE"` ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"` PeerUrls string `toml:"peer-urls" env:"PEER_URLS"` @@ -296,6 +300,7 @@ func MakeConfigParser() (*Parser, error) { DefaultClusterShardTotal: defaultClusterShardTotal, EnableSchedule: enableSchedule, TopologyType: defaultTopologyType, + ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize, HTTPPort: defaultHTTPPort, } diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 8e46d870..c5f0ec07 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -81,6 +81,11 @@ type CreatePartitionTableRequest struct { OnFailed func(error) error } +type BatchRequest struct { + Batch []procedure.Procedure + BatchType procedure.Typ +} + func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory { return &Factory{ idAllocator: allocator, @@ -253,6 +258,15 @@ func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest ) } +func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) { + id, err := f.allocProcedureID(ctx) + if err != nil { + return nil, err + } + + return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch) +} + func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) { id, err := f.idAllocator.Alloc(ctx) if err != nil { diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index 44c2f5bb..29a8864f 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -22,4 +22,6 @@ var ( ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data") ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure") ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough") + ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty") + ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch") ) diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go index ff37a82c..d60437d1 100644 --- a/server/coordinator/procedure/manager_impl.go +++ b/server/coordinator/procedure/manager_impl.go @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error { return nil } +// TODO: Filter duplicate submitted Procedure. func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error { if err := m.waitingProcedures.Push(procedure, 0); err != nil { return err diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go new file mode 100644 index 00000000..8d2f5fa2 --- /dev/null +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go @@ -0,0 +1,130 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +package transferleader + +import ( + "context" + "fmt" + "sync" + + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// BatchTransferLeaderProcedure is a proxy procedure contains a batch of TransferLeaderProcedure. +// It is used to support concurrent execution of a batch of TransferLeaderProcedure with same version. +type BatchTransferLeaderProcedure struct { + id uint64 + batch []procedure.Procedure + relatedVersionInfo procedure.RelatedVersionInfo + + // Protect the state. + lock sync.RWMutex + state procedure.State +} + +func NewBatchTransferLeaderProcedure(id uint64, batch []procedure.Procedure) (procedure.Procedure, error) { + if len(batch) == 0 { + return nil, procedure.ErrEmptyBatchProcedure + } + + relateVersionInfo, err := buildBatchRelatedVersionInfo(batch) + if err != nil { + return nil, err + } + + return &BatchTransferLeaderProcedure{id: id, batch: batch, state: procedure.StateInit, relatedVersionInfo: relateVersionInfo}, nil +} + +func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.RelatedVersionInfo, error) { + if len(batch) == 0 { + return procedure.RelatedVersionInfo{}, nil + } + + result := procedure.RelatedVersionInfo{ + ClusterID: batch[0].RelatedVersionInfo().ClusterID, + ShardWithVersion: map[storage.ShardID]uint64{}, + ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion, + } + + // The version of this batch of procedures must be the same. + for _, p := range batch { + if p.RelatedVersionInfo().ClusterID != result.ClusterID { + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent") + } + if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion { + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent") + } + // The ShardVersion of the same shard must be consistent. + for shardID, version := range p.RelatedVersionInfo().ShardWithVersion { + if resultVersion, exists := result.ShardWithVersion[shardID]; exists { + if version != resultVersion { + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID:%d, expetcdShardVersion:%d, shardVersion:%d", shardID, version, resultVersion)) + } + } else { + result.ShardWithVersion[shardID] = version + } + } + } + + return result, nil +} + +func (p *BatchTransferLeaderProcedure) ID() uint64 { + return p.id +} + +func (p *BatchTransferLeaderProcedure) Typ() procedure.Typ { + return procedure.TransferLeader +} + +func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error { + // Start procedures with multiple goroutine. + g, _ := errgroup.WithContext(ctx) + for _, p := range p.batch { + p := p + g.Go(func() error { + err := p.Start(ctx) + if err != nil { + log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)), zap.Error(err)) + } + return err + }) + } + + if err := g.Wait(); err != nil { + p.updateStateWithLock(procedure.StateFailed) + return err + } + + p.updateStateWithLock(procedure.StateFinished) + return nil +} + +func (p *BatchTransferLeaderProcedure) Cancel(_ context.Context) error { + p.updateStateWithLock(procedure.StateCancelled) + return nil +} + +func (p *BatchTransferLeaderProcedure) State() procedure.State { + return p.state +} + +func (p *BatchTransferLeaderProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + return p.relatedVersionInfo +} + +func (p *BatchTransferLeaderProcedure) Priority() procedure.Priority { + return p.batch[0].Priority() +} + +func (p *BatchTransferLeaderProcedure) updateStateWithLock(state procedure.State) { + p.lock.Lock() + defer p.lock.Unlock() + + p.state = state +} diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go new file mode 100644 index 00000000..4838ccbd --- /dev/null +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go @@ -0,0 +1,106 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +package transferleader_test + +import ( + "context" + "testing" + + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/stretchr/testify/require" +) + +type mockProcedure struct { + ClusterID storage.ClusterID + clusterVersion uint64 + typ procedure.Typ + ShardWithVersion map[storage.ShardID]uint64 +} + +func (m mockProcedure) ID() uint64 { + return 0 +} + +func (m mockProcedure) Typ() procedure.Typ { + return m.typ +} + +func (m mockProcedure) Start(_ context.Context) error { + return nil +} + +func (m mockProcedure) Cancel(_ context.Context) error { + return nil +} + +func (m mockProcedure) State() procedure.State { + return procedure.StateInit +} + +func (m mockProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + return procedure.RelatedVersionInfo{ + ClusterID: m.ClusterID, + ShardWithVersion: m.ShardWithVersion, + ClusterVersion: m.clusterVersion, + } +} + +func (m mockProcedure) Priority() procedure.Priority { + return procedure.PriorityLow +} + +func TestBatchProcedure(t *testing.T) { + re := require.New(t) + var procedures []procedure.Procedure + + // Procedures with same type and version. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(i)] = 0 + p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion) + procedures = append(procedures, p) + } + _, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.NoError(err) + + // Procedure with different clusterID. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(i)] = 0 + p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion) + procedures = append(procedures, p) + } + _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.Error(err) + + // Procedures with different type. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(i)] = 0 + p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion) + procedures = append(procedures, p) + } + _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.Error(err) + + // Procedures with different version. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(0)] = uint64(i) + p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion) + procedures = append(procedures, p) + } + _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.Error(err) +} + +func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure { + return mockProcedure{ + ClusterID: clusterID, + clusterVersion: clusterVersion, + typ: typ, + ShardWithVersion: shardWithVersion, + } +} diff --git a/server/coordinator/procedure/operation/transferleader/transfer_leader.go b/server/coordinator/procedure/operation/transferleader/transfer_leader.go index ab32098e..ce0672e5 100644 --- a/server/coordinator/procedure/operation/transferleader/transfer_leader.go +++ b/server/coordinator/procedure/operation/transferleader/transfer_leader.go @@ -279,14 +279,14 @@ func openNewShardCallback(event *fsm.Event) { Shard: metadata.ShardInfo{ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, Version: preVersion + 1}, } - log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName)) + log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName)) if err := req.p.params.Dispatch.OpenShard(ctx, req.p.params.NewLeaderNodeName, openShardRequest); err != nil { procedure.CancelEventWithLog(event, err, "open shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName)) return } - log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName)) + log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName)) } func finishCallback(event *fsm.Event) { diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go index dc120e36..b267d846 100644 --- a/server/coordinator/procedure/test/common.go +++ b/server/coordinator/procedure/test/common.go @@ -6,6 +6,7 @@ import ( "context" "crypto/rand" "fmt" + "math" "math/big" "testing" "time" @@ -21,16 +22,17 @@ import ( ) const ( - TestTableName0 = "table0" - TestTableName1 = "table1" - TestSchemaName = "TestSchemaName" - TestRootPath = "/rootPath" - DefaultIDAllocatorStep = 20 - ClusterName = "ceresdbCluster1" - DefaultNodeCount = 2 - DefaultShardTotal = 4 - DefaultSchedulerOperator = true - DefaultTopologyType = "static" + TestTableName0 = "table0" + TestTableName1 = "table1" + TestSchemaName = "TestSchemaName" + TestRootPath = "/rootPath" + DefaultIDAllocatorStep = 20 + ClusterName = "ceresdbCluster1" + DefaultNodeCount = 2 + DefaultShardTotal = 4 + DefaultSchedulerOperator = true + DefaultTopologyType = "static" + DefaultProcedureExecutingBatchSize = math.MaxUint32 ) type MockDispatch struct{} @@ -99,13 +101,14 @@ func InitEmptyCluster(ctx context.Context, t *testing.T) *cluster.Cluster { logger := zap.NewNop() clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{ - ID: 0, - Name: ClusterName, - MinNodeCount: DefaultNodeCount, - ShardTotal: DefaultShardTotal, - EnableSchedule: DefaultSchedulerOperator, - TopologyType: DefaultTopologyType, - CreatedAt: 0, + ID: 0, + Name: ClusterName, + MinNodeCount: DefaultNodeCount, + ShardTotal: DefaultShardTotal, + EnableSchedule: DefaultSchedulerOperator, + TopologyType: DefaultTopologyType, + ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize, + CreatedAt: 0, }, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep) err := clusterMetadata.Init(ctx) diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index 26f756ba..9d1ebdeb 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -5,26 +5,26 @@ package scheduler import ( "context" "fmt" + "strings" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" ) -const ( - AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node" -) - // AssignShardScheduler used to assigning shards without nodes. type AssignShardScheduler struct { - factory *coordinator.Factory - nodePicker coordinator.NodePicker + factory *coordinator.Factory + nodePicker coordinator.NodePicker + procedureExecutingBatchSize uint32 } -func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { +func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { return &AssignShardScheduler{ - factory: factory, - nodePicker: nodePicker, + factory: factory, + nodePicker: nodePicker, + procedureExecutingBatchSize: procedureExecutingBatchSize, } } @@ -33,6 +33,8 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta return ScheduleResult{}, nil } + var procedures []procedure.Procedure + var reasons strings.Builder // Check whether there is a shard without node mapping. for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping { _, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes) @@ -53,12 +55,30 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("try to assign shard:%d to node:%s ,reason:%v", shardView.ShardID, newLeaderNode.Node.Name, AssignReason), - }, nil + + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name)) + if len(procedures) >= int(a.procedureExecutingBatchSize) { + break + } + } + + if len(procedures) == 0 { + return ScheduleResult{}, nil } - return ScheduleResult{}, nil + + batchProcedure, err := a.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{ + batchProcedure, + reasons.String(), + }, nil } func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) { diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go index 7493ace8..1fd2aed6 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler_test.go +++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go @@ -19,23 +19,23 @@ func TestAssignSchedule(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled an empty procedure. stableCluster := test.InitStableCluster(ctx, t) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index 7517273e..63b70feb 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -5,23 +5,27 @@ package scheduler import ( "context" "fmt" + "strings" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "go.uber.org/zap" ) type RebalancedShardScheduler struct { - logger *zap.Logger - factory *coordinator.Factory - nodePicker coordinator.NodePicker + logger *zap.Logger + factory *coordinator.Factory + nodePicker coordinator.NodePicker + procedureExecutingBatchSize uint32 } -func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { +func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { return &RebalancedShardScheduler{ - logger: logger, - factory: factory, - nodePicker: nodePicker, + logger: logger, + factory: factory, + nodePicker: nodePicker, + procedureExecutingBatchSize: procedureExecutingBatchSize, } } @@ -31,6 +35,8 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{}, nil } + var procedures []procedure.Procedure + var reasons strings.Builder // TODO: Improve scheduling efficiency and verify whether the topology changes. for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes { node, err := r.nodePicker.PickNode(ctx, shardNode.ID, clusterSnapshot.RegisteredNodes) @@ -48,12 +54,25 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s", shardNode.ID, shardNode.NodeName, node.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name)) + if len(procedures) >= int(r.procedureExecutingBatchSize) { + break + } } } - return ScheduleResult{}, nil + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := r.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{batchProcedure, reasons.String()}, nil } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go index 89c2f80c..ce926287 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go @@ -19,22 +19,22 @@ func TestRebalancedScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled an empty procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // StableCluster with all shards assigned would be scheduled a load balance procedure. stableCluster := test.InitStableCluster(ctx, t) - result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) + _, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) } diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index c3312c69..2b78eec4 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -52,15 +52,16 @@ type ManagerImpl struct { rootPath string // This lock is used to protect the following field. - lock sync.RWMutex - registerSchedulers []Scheduler - shardWatch watch.ShardWatch - isRunning atomic.Bool - enableSchedule bool - topologyType storage.TopologyType + lock sync.RWMutex + registerSchedulers []Scheduler + shardWatch watch.ShardWatch + isRunning atomic.Bool + enableSchedule bool + topologyType storage.TopologyType + procedureExecutingBatchSize uint32 } -func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType) Manager { +func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager { var shardWatch watch.ShardWatch switch topologyType { case storage.TopologyTypeDynamic: @@ -71,17 +72,18 @@ func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory } return &ManagerImpl{ - procedureManager: procedureManager, - registerSchedulers: []Scheduler{}, - factory: factory, - nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas), - clusterMetadata: clusterMetadata, - client: client, - shardWatch: shardWatch, - rootPath: rootPath, - enableSchedule: enableSchedule, - topologyType: topologyType, - logger: logger, + procedureManager: procedureManager, + registerSchedulers: []Scheduler{}, + factory: factory, + nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas), + clusterMetadata: clusterMetadata, + client: client, + shardWatch: shardWatch, + rootPath: rootPath, + enableSchedule: enableSchedule, + topologyType: topologyType, + procedureExecutingBatchSize: procedureExecutingBatchSize, + logger: logger, } } @@ -189,13 +191,13 @@ func (m *ManagerImpl) initRegister() { } func (m *ManagerImpl) createStaticTopologySchedulers() []Scheduler { - staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker) + staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize) return []Scheduler{staticTopologyShardScheduler} } func (m *ManagerImpl) createDynamicTopologySchedulers() []Scheduler { - assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker) - rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker) + assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize) + rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker, m.procedureExecutingBatchSize) return []Scheduler{assignShardScheduler, rebalancedShardScheduler} } diff --git a/server/coordinator/scheduler/scheduler_manager_test.go b/server/coordinator/scheduler/scheduler_manager_test.go index d2dec5cd..a9df691a 100644 --- a/server/coordinator/scheduler/scheduler_manager_test.go +++ b/server/coordinator/scheduler/scheduler_manager_test.go @@ -31,14 +31,14 @@ func TestSchedulerManager(t *testing.T) { _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) // Create scheduler manager with enableScheduler equal to false. - schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic) + schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) err = schedulerManager.Stop(ctx) re.NoError(err) // Create scheduler manager with static topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic) + schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers := schedulerManager.ListScheduler() @@ -47,7 +47,7 @@ func TestSchedulerManager(t *testing.T) { re.NoError(err) // Create scheduler manager with dynamic topology. - schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic) + schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1) err = schedulerManager.Start(ctx) re.NoError(err) schedulers = schedulerManager.ListScheduler() diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index ceb67a3a..ff4b08bd 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -5,24 +5,30 @@ package scheduler import ( "context" "fmt" + "strings" "time" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" ) type StaticTopologyShardScheduler struct { - factory *coordinator.Factory - nodePicker coordinator.NodePicker + factory *coordinator.Factory + nodePicker coordinator.NodePicker + procedureExecutingBatchSize uint32 } -func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler { - return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker} +func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { + return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize} } func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { + var procedures []procedure.Procedure + var reasons strings.Builder + switch clusterSnapshot.Topology.ClusterView.State { case storage.ClusterStateEmpty: return ScheduleResult{}, nil @@ -47,10 +53,11 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s", shardView.ShardID, newLeaderNode.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name)) + if len(procedures) >= int(s.procedureExecutingBatchSize) { + break + } } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -70,15 +77,28 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("Cluster state is stable, shard:%d is reopened in node:%s", shardNode.ID, node.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name)) + if len(procedures) >= int(s.procedureExecutingBatchSize) { + break + } } } } - return ScheduleResult{}, nil + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := s.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{batchProcedure, reasons.String()}, nil } func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go index 153d5553..c76c997a 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go @@ -19,23 +19,23 @@ func TestStaticTopologyScheduler(t *testing.T) { procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50)) + s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1) // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule. stableCluster := test.InitStableCluster(ctx, t) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) } diff --git a/server/server.go b/server/server.go index c77bf62d..051661cf 100644 --- a/server/server.go +++ b/server/server.go @@ -237,11 +237,12 @@ func (srv *Server) createDefaultCluster(ctx context.Context) error { } defaultCluster, err := srv.clusterManager.CreateCluster(ctx, srv.cfg.DefaultClusterName, metadata.CreateClusterOpts{ - NodeCount: uint32(srv.cfg.DefaultClusterNodeCount), - ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor), - ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal), - EnableSchedule: srv.cfg.EnableSchedule, - TopologyType: topologyType, + NodeCount: uint32(srv.cfg.DefaultClusterNodeCount), + ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor), + ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal), + EnableSchedule: srv.cfg.EnableSchedule, + TopologyType: topologyType, + ProcedureExecutingBatchSize: srv.cfg.ProcedureExecutingBatchSize, }) if err != nil { log.Warn("create default cluster failed", zap.Error(err)) diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 94945f1f..692ffd62 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -297,7 +297,7 @@ func (s *Service) RouteTables(ctx context.Context, req *metaservicepb.RouteTable return &metaservicepb.RouteTablesResponse{Header: responseHeader(err, "grpc routeTables")}, nil } - log.Info("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ","))) + log.Debug("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ","))) // Forward request to the leader. if ceresmetaClient != nil { diff --git a/server/service/http/api.go b/server/service/http/api.go index 6eb4aaaa..cd2ed99e 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -542,10 +542,11 @@ func (a *API) createCluster(writer http.ResponseWriter, req *http.Request) { } type UpdateClusterRequest struct { - NodeCount uint32 `json:"NodeCount"` - ShardTotal uint32 `json:"ShardTotal"` - EnableSchedule bool `json:"enableSchedule"` - TopologyType string `json:"topologyType"` + NodeCount uint32 `json:"nodeCount"` + ShardTotal uint32 `json:"shardTotal"` + EnableSchedule bool `json:"enableSchedule"` + TopologyType string `json:"topologyType"` + ProcedureExecutingBatchSize uint32 `json:"procedureExecutingBatchSize"` } func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) { @@ -590,8 +591,9 @@ func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) { } if err := a.clusterManager.UpdateCluster(req.Context(), clusterName, metadata.UpdateClusterOpts{ - EnableSchedule: updateClusterRequest.EnableSchedule, - TopologyType: topologyType, + EnableSchedule: updateClusterRequest.EnableSchedule, + TopologyType: topologyType, + ProcedureExecutingBatchSize: updateClusterRequest.ProcedureExecutingBatchSize, }); err != nil { log.Error("update cluster failed", zap.Error(err)) a.respondError(writer, metadata.ErrUpdateCluster, fmt.Sprintf("err: %s", err.Error())) diff --git a/server/storage/types.go b/server/storage/types.go index a80cb826..02ef7857 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -152,12 +152,13 @@ type Cluster struct { Name string MinNodeCount uint32 // Deprecated: ReplicationFactor is deprecated after CeresMeta v1.2.0 - ReplicationFactor uint32 - ShardTotal uint32 - EnableSchedule bool - TopologyType TopologyType - CreatedAt uint64 - ModifiedAt uint64 + ReplicationFactor uint32 + ShardTotal uint32 + EnableSchedule bool + TopologyType TopologyType + ProcedureExecutingBatchSize uint32 + CreatedAt uint64 + ModifiedAt uint64 } type ShardNode struct { @@ -258,27 +259,29 @@ func convertNodeToPB(node Node) clusterpb.Node { func convertClusterPB(cluster *clusterpb.Cluster) Cluster { return Cluster{ - ID: ClusterID(cluster.Id), - Name: cluster.Name, - MinNodeCount: cluster.MinNodeCount, - ShardTotal: cluster.ShardTotal, - EnableSchedule: cluster.EnableSchedule, - TopologyType: convertTopologyTypePB(cluster.TopologyType), - CreatedAt: cluster.CreatedAt, - ModifiedAt: cluster.ModifiedAt, + ID: ClusterID(cluster.Id), + Name: cluster.Name, + MinNodeCount: cluster.MinNodeCount, + ShardTotal: cluster.ShardTotal, + EnableSchedule: cluster.EnableSchedule, + TopologyType: convertTopologyTypePB(cluster.TopologyType), + ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize, + CreatedAt: cluster.CreatedAt, + ModifiedAt: cluster.ModifiedAt, } } func convertClusterToPB(cluster Cluster) clusterpb.Cluster { return clusterpb.Cluster{ - Id: uint32(cluster.ID), - Name: cluster.Name, - MinNodeCount: cluster.MinNodeCount, - ShardTotal: cluster.ShardTotal, - EnableSchedule: cluster.EnableSchedule, - TopologyType: convertTopologyTypeToPB(cluster.TopologyType), - CreatedAt: cluster.CreatedAt, - ModifiedAt: cluster.ModifiedAt, + Id: uint32(cluster.ID), + Name: cluster.Name, + MinNodeCount: cluster.MinNodeCount, + ShardTotal: cluster.ShardTotal, + EnableSchedule: cluster.EnableSchedule, + TopologyType: convertTopologyTypeToPB(cluster.TopologyType), + ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize, + CreatedAt: cluster.CreatedAt, + ModifiedAt: cluster.ModifiedAt, } }