Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

refactor: support open shard concurrently #205

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c
github.com/axw/gocov v1.1.0
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
Expand All @@ -18,6 +18,8 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.10
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
Expand Down Expand Up @@ -89,11 +91,9 @@ require (
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c h1:7gmSQsGua+Y1g6ygsC/K75T/zK2ki7y5R5BkrN1/Ymc=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client *
dispatch := eventdispatch.NewDispatchImpl()
procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)

schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType())
schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())

return &Cluster{
logger: logger,
Expand Down
51 changes: 27 additions & 24 deletions server/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,15 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, opt

createTime := time.Now().UnixMilli()
clusterMetadataStorage := storage.Cluster{
ID: clusterID,
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
CreatedAt: uint64(createTime),
ModifiedAt: uint64(createTime),
ID: clusterID,
Name: clusterName,
MinNodeCount: opts.NodeCount,
ShardTotal: opts.ShardTotal,
EnableSchedule: opts.EnableSchedule,
TopologyType: opts.TopologyType,
ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize,
CreatedAt: uint64(createTime),
ModifiedAt: uint64(createTime),
}
err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{
Cluster: clusterMetadataStorage,
Expand Down Expand Up @@ -164,14 +165,15 @@ func (m *managerImpl) UpdateCluster(ctx context.Context, clusterName string, opt
}

err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{
ID: c.GetMetadata().GetClusterID(),
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
CreatedAt: c.GetMetadata().GetCreateTime(),
ModifiedAt: uint64(time.Now().UnixMilli()),
ID: c.GetMetadata().GetClusterID(),
Name: c.GetMetadata().Name(),
MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
ShardTotal: c.GetMetadata().GetTotalShardNum(),
EnableSchedule: opt.EnableSchedule,
TopologyType: opt.TopologyType,
ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize,
CreatedAt: c.GetMetadata().GetCreateTime(),
ModifiedAt: uint64(time.Now().UnixMilli()),
}})
if err != nil {
log.Error("update cluster", zap.Error(err))
Expand Down Expand Up @@ -333,14 +335,15 @@ func (m *managerImpl) Start(ctx context.Context) error {
if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
req := storage.UpdateClusterRequest{
Cluster: storage.Cluster{
ID: metadataStorage.ID,
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
CreatedAt: metadataStorage.CreatedAt,
ModifiedAt: uint64(time.Now().UnixMilli()),
ID: metadataStorage.ID,
Name: metadataStorage.Name,
MinNodeCount: metadataStorage.MinNodeCount,
ShardTotal: metadataStorage.ShardTotal,
EnableSchedule: metadataStorage.EnableSchedule,
TopologyType: m.topologyType,
ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize,
CreatedAt: metadataStorage.CreatedAt,
ModifiedAt: uint64(time.Now().UnixMilli()),
},
}
if err := m.storage.UpdateCluster(ctx, req); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ func (c *ClusterMetadata) GetTopologyType() storage.TopologyType {
return c.metaData.TopologyType
}

func (c *ClusterMetadata) GetProcedureExecutingBatchSize() uint32 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.metaData.ProcedureExecutingBatchSize
}

func (c *ClusterMetadata) GetCreateTime() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
16 changes: 9 additions & 7 deletions server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ type ShardNodeWithVersion struct {
}

type CreateClusterOpts struct {
NodeCount uint32
ReplicationFactor uint32
ShardTotal uint32
EnableSchedule bool
TopologyType storage.TopologyType
NodeCount uint32
ReplicationFactor uint32
ShardTotal uint32
EnableSchedule bool
TopologyType storage.TopologyType
ProcedureExecutingBatchSize uint32
}

type UpdateClusterOpts struct {
EnableSchedule bool
TopologyType storage.TopologyType
EnableSchedule bool
TopologyType storage.TopologyType
ProcedureExecutingBatchSize uint32
}

type CreateTableMetadataRequest struct {
Expand Down
7 changes: 6 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config
import (
"flag"
"fmt"
"math"
"os"
"strings"
"time"
Expand Down Expand Up @@ -52,7 +53,8 @@ const (
defaultClusterShardTotal = 8
enableSchedule = true
// topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage.
defaultTopologyType = "static"
defaultTopologyType = "static"
defaultProcedureExecutingBatchSize = math.MaxUint32

defaultHTTPPort = 8080

Expand Down Expand Up @@ -127,6 +129,8 @@ type Config struct {
EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"`
// TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster.
TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"`
// ProcedureExecutingBatchSize determines the maximum number of shards in a single batch when opening shards concurrently.
ProcedureExecutingBatchSize uint32 `toml:"procedure-executing-batch-size" env:"PROCEDURE_EXECUTING_BATCH_SIZE"`

ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"`
PeerUrls string `toml:"peer-urls" env:"PEER_URLS"`
Expand Down Expand Up @@ -296,6 +300,7 @@ func MakeConfigParser() (*Parser, error) {
DefaultClusterShardTotal: defaultClusterShardTotal,
EnableSchedule: enableSchedule,
TopologyType: defaultTopologyType,
ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize,

HTTPPort: defaultHTTPPort,
}
Expand Down
14 changes: 14 additions & 0 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ type CreatePartitionTableRequest struct {
OnFailed func(error) error
}

type BatchRequest struct {
Batch []procedure.Procedure
BatchType procedure.Typ
}

func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
return &Factory{
idAllocator: allocator,
Expand Down Expand Up @@ -253,6 +258,15 @@ func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest
)
}

func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}

return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch)
}

func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions server/coordinator/procedure/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ var (
ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough")
ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty")
ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch")
)
1 change: 1 addition & 0 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error {
return nil
}

// TODO: Filter duplicate submitted Procedure.
func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error {
if err := m.waitingProcedures.Push(procedure, 0); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package transferleader

import (
"context"
"fmt"
"sync"

"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// BatchTransferLeaderProcedure is a proxy procedure contains a batch of TransferLeaderProcedure.
// It is used to support concurrent execution of a batch of TransferLeaderProcedure with same version.
type BatchTransferLeaderProcedure struct {
id uint64
batch []procedure.Procedure
relatedVersionInfo procedure.RelatedVersionInfo

// Protect the state.
lock sync.RWMutex
state procedure.State
}

func NewBatchTransferLeaderProcedure(id uint64, batch []procedure.Procedure) (procedure.Procedure, error) {
if len(batch) == 0 {
return nil, procedure.ErrEmptyBatchProcedure
}

relateVersionInfo, err := buildBatchRelatedVersionInfo(batch)
if err != nil {
return nil, err
}

return &BatchTransferLeaderProcedure{id: id, batch: batch, state: procedure.StateInit, relatedVersionInfo: relateVersionInfo}, nil
}

func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.RelatedVersionInfo, error) {
if len(batch) == 0 {
return procedure.RelatedVersionInfo{}, nil
}

result := procedure.RelatedVersionInfo{
ClusterID: batch[0].RelatedVersionInfo().ClusterID,
ShardWithVersion: map[storage.ShardID]uint64{},
ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion,
}

// The version of this batch of procedures must be the same.
for _, p := range batch {
if p.RelatedVersionInfo().ClusterID != result.ClusterID {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent")
}
if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent")
}
// The ShardVersion of the same shard must be consistent.
for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
if version != resultVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID:%d, expetcdShardVersion:%d, shardVersion:%d", shardID, version, resultVersion))
}
} else {
result.ShardWithVersion[shardID] = version
}
}
}

return result, nil
}

func (p *BatchTransferLeaderProcedure) ID() uint64 {
return p.id
}

func (p *BatchTransferLeaderProcedure) Typ() procedure.Typ {
return procedure.TransferLeader
}

func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error {
// Start procedures with multiple goroutine.
g, _ := errgroup.WithContext(ctx)
for _, p := range p.batch {
ZuLiangWang marked this conversation as resolved.
Show resolved Hide resolved
p := p
g.Go(func() error {
err := p.Start(ctx)
if err != nil {
log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)), zap.Error(err))
}
return err
})
}

if err := g.Wait(); err != nil {
p.updateStateWithLock(procedure.StateFailed)
return err
}

p.updateStateWithLock(procedure.StateFinished)
return nil
}

func (p *BatchTransferLeaderProcedure) Cancel(_ context.Context) error {
p.updateStateWithLock(procedure.StateCancelled)
return nil
}

func (p *BatchTransferLeaderProcedure) State() procedure.State {
return p.state
}

func (p *BatchTransferLeaderProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
return p.relatedVersionInfo
}

func (p *BatchTransferLeaderProcedure) Priority() procedure.Priority {
return p.batch[0].Priority()
}

func (p *BatchTransferLeaderProcedure) updateStateWithLock(state procedure.State) {
p.lock.Lock()
defer p.lock.Unlock()

p.state = state
}
Loading