Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor by cr
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jul 7, 2023
1 parent 4c8c955 commit 738afd4
Show file tree
Hide file tree
Showing 23 changed files with 204 additions and 170 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c
github.com/axw/gocov v1.1.0
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
Expand All @@ -18,6 +18,8 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.10
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
Expand Down Expand Up @@ -83,17 +85,15 @@ require (
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirectmake
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707015251-b771ff78a281 h1:F0pmf85TCDi6/UJoika1eYGDLuJSdEGvOKMb7Ha3u3s=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707015251-b771ff78a281/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c h1:7gmSQsGua+Y1g6ygsC/K75T/zK2ki7y5R5BkrN1/Ymc=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client *
dispatch := eventdispatch.NewDispatchImpl()
procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)

schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType())
schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())

return &Cluster{
logger: logger,
Expand Down
51 changes: 27 additions & 24 deletions server/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,15 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt

createTime := time.Now().UnixMilli()
clusterMetadataStorage := storage.Cluster{
ID: clusterID,
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
CreatedAt: uint64(createTime),
ModifiedAt: uint64(createTime),
ID: clusterID,
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize,
CreatedAt: uint64(createTime),
ModifiedAt: uint64(createTime),
}
err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{
Cluster: clusterMetadataStorage,
Expand Down Expand Up @@ -164,14 +165,15 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt
}

err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{
ID: c.GetMetadata().GetClusterID(),
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
CreatedAt: c.GetMetadata().GetCreateTime(),
ModifiedAt: uint64(time.Now().UnixMilli()),
ID: c.GetMetadata().GetClusterID(),
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize,
CreatedAt: c.GetMetadata().GetCreateTime(),
ModifiedAt: uint64(time.Now().UnixMilli()),
}})
if err != nil {
log.Error("update cluster", zap.Error(err))
Expand Down Expand Up @@ -333,14 +335,15 @@ func (m *managerImpl) Start(ctx context.Context) error {
if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
req := storage.UpdateClusterRequest{
Cluster: storage.Cluster{
ID: metadataStorage.ID,
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
CreatedAt: metadataStorage.CreatedAt,
ModifiedAt: uint64(time.Now().UnixMilli()),
ID: metadataStorage.ID,
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize,
CreatedAt: metadataStorage.CreatedAt,
ModifiedAt: uint64(time.Now().UnixMilli()),
},
}
if err := m.storage.UpdateCluster(ctx, req); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ func (c *ClusterMetadata) GetTopologyType() storage.TopologyType {
return c.metaData.TopologyType
}

func (c *ClusterMetadata) GetProcedureExecutingBatchSize() uint32 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.metaData.ProcedureExecutingBatchSize
}

func (c *ClusterMetadata) GetCreateTime() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
16 changes: 9 additions & 7 deletions server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ type ShardNodeWithVersion struct {
}

type CreateClusterOpts struct {
NodeCount uint32
ReplicationFactor uint32
ShardTotal uint32
EnableSchedule bool
TopologyType storage.TopologyType
NodeCount uint32
ReplicationFactor uint32
ShardTotal uint32
EnableSchedule bool
TopologyType storage.TopologyType
ProcedureExecutingBatchSize uint32
}

type UpdateClusterOpts struct {
EnableSchedule bool
TopologyType storage.TopologyType
EnableSchedule bool
TopologyType storage.TopologyType
ProcedureExecutingBatchSize uint32
}

type CreateTableMetadataRequest struct {
Expand Down
7 changes: 6 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config
import (
"flag"
"fmt"
"math"
"os"
"strings"
"time"
Expand Down Expand Up @@ -52,7 +53,8 @@ const (
defaultClusterShardTotal = 8
enableSchedule = true
// topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage.
defaultTopologyType = "static"
defaultTopologyType = "static"
defaultProcedureExecutingBatchSize = math.MaxUint32

defaultHTTPPort = 8080

Expand Down Expand Up @@ -127,6 +129,8 @@ type Config struct {
EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"`
// TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster.
TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"`
// ProcedureExecutingBatchSize determines the maximum number of shards in a single batch when opening shards concurrently.
ProcedureExecutingBatchSize uint32 `toml:"procedure-executing-batch-size" env:"PROCEDURE_EXECUTING_BATCH_SIZE"`

ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"`
PeerUrls string `toml:"peer-urls" env:"PEER_URLS"`
Expand Down Expand Up @@ -296,6 +300,7 @@ func MakeConfigParser() (*Parser, error) {
DefaultClusterShardTotal: defaultClusterShardTotal,
EnableSchedule: enableSchedule,
TopologyType: defaultTopologyType,
ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize,

HTTPPort: defaultHTTPPort,
}
Expand Down
1 change: 1 addition & 0 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error {
return nil
}

// TODO: Filter duplicate submitted Procedure.
func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error {
if err := m.waitingProcedures.Push(procedure, 0); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.Relate
for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
if version != resultVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent")
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID: %d, expetcdShardVersion: %d, shardVersion: %d", shardID, version, resultVersion))
}
} else {
result.ShardWithVersion[shardID] = version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,23 @@ func (m mockProcedure) Priority() procedure.Priority {

func TestBatchProcedure(t *testing.T) {
re := require.New(t)

var procedures []procedure.Procedure

// Procedures with same type and version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: 0,
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion)
procedures = append(procedures, p)
}
_, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.NoError(err)

// Procedure with different clusterID
// Procedure with different clusterID.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: storage.ClusterID(i),
clusterVersion: 0,
typ: procedure.TransferLeader,
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
Expand All @@ -90,12 +79,7 @@ func TestBatchProcedure(t *testing.T) {
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
Expand All @@ -105,14 +89,18 @@ func TestBatchProcedure(t *testing.T) {
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(0)] = uint64(i)
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
}

func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure {
return mockProcedure{
ClusterID: clusterID,
clusterVersion: clusterVersion,
typ: typ,
ShardWithVersion: shardWithVersion,
}
}
37 changes: 20 additions & 17 deletions server/coordinator/procedure/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"crypto/rand"
"fmt"
"math"
"math/big"
"testing"
"time"
Expand All @@ -21,16 +22,17 @@ import (
)

const (
TestTableName0 = "table0"
TestTableName1 = "table1"
TestSchemaName = "TestSchemaName"
TestRootPath = "/rootPath"
DefaultIDAllocatorStep = 20
ClusterName = "ceresdbCluster1"
DefaultNodeCount = 2
DefaultShardTotal = 4
DefaultSchedulerOperator = true
DefaultTopologyType = "static"
TestTableName0 = "table0"
TestTableName1 = "table1"
TestSchemaName = "TestSchemaName"
TestRootPath = "/rootPath"
DefaultIDAllocatorStep = 20
ClusterName = "ceresdbCluster1"
DefaultNodeCount = 2
DefaultShardTotal = 4
DefaultSchedulerOperator = true
DefaultTopologyType = "static"
DefaultProcedureExecutingBatchSize = math.MaxUint32
)

type MockDispatch struct{}
Expand Down Expand Up @@ -99,13 +101,14 @@ func InitEmptyCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
logger := zap.NewNop()

clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{
ID: 0,
Name: ClusterName,
MinNodeCount: DefaultNodeCount,
ShardTotal: DefaultShardTotal,
EnableSchedule: DefaultSchedulerOperator,
TopologyType: DefaultTopologyType,
CreatedAt: 0,
ID: 0,
Name: ClusterName,
MinNodeCount: DefaultNodeCount,
ShardTotal: DefaultShardTotal,
EnableSchedule: DefaultSchedulerOperator,
TopologyType: DefaultTopologyType,
ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize,
CreatedAt: 0,
}, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep)

err := clusterMetadata.Init(ctx)
Expand Down
17 changes: 11 additions & 6 deletions server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ import (

// AssignShardScheduler used to assigning shards without nodes.
type AssignShardScheduler struct {
factory *coordinator.Factory
nodePicker coordinator.NodePicker
factory *coordinator.Factory
nodePicker coordinator.NodePicker
procedureExecutingBatchSize uint32
}

func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
return &AssignShardScheduler{
factory: factory,
nodePicker: nodePicker,
factory: factory,
nodePicker: nodePicker,
procedureExecutingBatchSize: procedureExecutingBatchSize,
}
}

Expand Down Expand Up @@ -55,7 +57,10 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
}

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name))
if len(procedures) >= int(a.procedureExecutingBatchSize) {
break
}
}

if len(procedures) == 0 {
Expand Down
Loading

0 comments on commit 738afd4

Please sign in to comment.