Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: support open shard concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jun 25, 2023
1 parent 741535c commit de5607e
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 35 deletions.
14 changes: 14 additions & 0 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ type CreatePartitionTableRequest struct {
OnFailed func(error) error
}

type BatchRequest struct {
Batch []procedure.Procedure
BatchType procedure.Typ
}

func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
return &Factory{
idAllocator: allocator,
Expand Down Expand Up @@ -253,6 +258,15 @@ func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest
)
}

func (f *Factory) CreateBatchProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}

return procedure.NewBatchProcedure(id, request.Batch, request.BatchType)
}

func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
Expand Down
125 changes: 125 additions & 0 deletions server/coordinator/procedure/batch_procedure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package procedure

import (
"context"
"fmt"
"sync"

"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type BatchProcedure struct {
id uint64
typ Typ
batch []Procedure
relatedVersionInfo RelatedVersionInfo

// Protect the state.
lock sync.RWMutex
state State
}

func NewBatchProcedure(id uint64, batch []Procedure, typ Typ) (Procedure, error) {
if len(batch) == 0 {
return nil, ErrEmptyBatchProcedure
}

relateVersionInfo, err := buildRelatedVersionInfo(batch, typ)
if err != nil {
return nil, err
}

return &BatchProcedure{id: id, batch: batch, state: StateInit, typ: typ, relatedVersionInfo: relateVersionInfo}, nil
}

func buildRelatedVersionInfo(batch []Procedure, typ Typ) (RelatedVersionInfo, error) {
if len(batch) == 0 {
return RelatedVersionInfo{}, nil
}

result := RelatedVersionInfo{
ClusterID: batch[0].RelatedVersionInfo().ClusterID,
ShardWithVersion: map[storage.ShardID]uint64{},
ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion,
}

// The version of this batch of procedures must be the same.
for _, p := range batch {
if p.Typ() != typ {
return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure type in the same batch is inconsistent")
}
if p.RelatedVersionInfo().ClusterID != result.ClusterID {
return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent")
}
if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion {
return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent")
}
// The ShardVersion of the same shard must be consistent.
for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
if version != resultVersion {
return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent")
}
} else {
result.ShardWithVersion[shardID] = version
}
}
}

return result, nil
}

func (p *BatchProcedure) ID() uint64 {
return p.id
}

func (p *BatchProcedure) Typ() Typ {
return p.typ
}

func (p *BatchProcedure) Start(ctx context.Context) error {
// Start procedures with multiple goroutine.
g, _ := errgroup.WithContext(ctx)
for _, p := range p.batch {
p := p
g.Go(func() error {
err := p.Start(ctx)
if err != nil {
log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)))
}
return err
})
}

return g.Wait()
}

func (p *BatchProcedure) Cancel(_ context.Context) error {
p.updateStateWithLock(StateCancelled)
return nil
}

func (p *BatchProcedure) State() State {
return p.state
}

func (p *BatchProcedure) RelatedVersionInfo() RelatedVersionInfo {
return p.relatedVersionInfo
}

func (p *BatchProcedure) Priority() Priority {
return p.batch[0].Priority()
}

func (p *BatchProcedure) updateStateWithLock(state State) {
p.lock.Lock()
defer p.lock.Unlock()

p.state = state
}
2 changes: 2 additions & 0 deletions server/coordinator/procedure/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ var (
ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough")
ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty")
ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch")
)
34 changes: 25 additions & 9 deletions server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ package scheduler

import (
"context"
"fmt"
"strings"

"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
)

const (
AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node"
)

// AssignShardScheduler used to assigning shards without nodes.
type AssignShardScheduler struct {
factory *coordinator.Factory
Expand All @@ -32,6 +31,8 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
return ScheduleResult{}, nil
}

var procedures []procedure.Procedure
var reasons strings.Builder
// Check whether there is a shard without node mapping.
for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping {
_, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes)
Expand All @@ -52,12 +53,27 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
if err != nil {
return ScheduleResult{}, err
}
return ScheduleResult{
Procedure: p,
Reason: AssignReason,
}, nil

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name))
}
return ScheduleResult{}, nil

if len(procedures) == 0 {
return ScheduleResult{}, nil
}

batchProcedure, err := a.factory.CreateBatchProcedure(ctx, coordinator.BatchRequest{
Batch: procedures,
BatchType: procedure.TransferLeader,
})
if err != nil {
return ScheduleResult{}, err
}

return ScheduleResult{
batchProcedure,
reasons.String(),
}, nil
}

func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) {
Expand Down
6 changes: 3 additions & 3 deletions server/coordinator/scheduler/assign_shard_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ func TestAssignSchedule(t *testing.T) {
emptyCluster := test.InitEmptyCluster(ctx, t)
result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Nil(result.Procedure)
re.Empty(result)

// PrepareCluster would be scheduled a transfer leader procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.NotNil(result.Procedure)
re.NotEmpty(result)

// StableCluster with all shards assigned would be scheduled an empty procedure.
stableCluster := test.InitStableCluster(ctx, t)
result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Nil(result.Procedure)
re.Empty(result)
}
24 changes: 19 additions & 5 deletions server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package scheduler
import (
"context"
"fmt"
"strings"

"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"go.uber.org/zap"
)

Expand All @@ -31,6 +33,8 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
return ScheduleResult{}, nil
}

var procedures []procedure.Procedure
var reasons strings.Builder
// TODO: Improve scheduling efficiency and verify whether the topology changes.
for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes {
node, err := r.nodePicker.PickNode(ctx, shardNode.ID, clusterSnapshot.RegisteredNodes)
Expand All @@ -48,12 +52,22 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
if err != nil {
return ScheduleResult{}, err
}
return ScheduleResult{
Procedure: p,
Reason: fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s", shardNode.ID, shardNode.NodeName, node.Node.Name),
}, nil
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name))
}
}

return ScheduleResult{}, nil
if len(procedures) == 0 {
return ScheduleResult{}, nil
}

batchProcedure, err := r.factory.CreateBatchProcedure(ctx, coordinator.BatchRequest{
Batch: procedures,
BatchType: procedure.TransferLeader,
})
if err != nil {
return ScheduleResult{}, err
}

return ScheduleResult{batchProcedure, reasons.String()}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ func TestRebalancedScheduler(t *testing.T) {
emptyCluster := test.InitEmptyCluster(ctx, t)
result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Nil(result.Procedure)
re.Empty(result)

// PrepareCluster would be scheduled an empty procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Nil(result.Procedure)
re.Empty(result)

// StableCluster with all shards assigned would be scheduled a load balance procedure.
stableCluster := test.InitStableCluster(ctx, t)
result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
_, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
}
6 changes: 3 additions & 3 deletions server/coordinator/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler {

func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult {
// TODO: Every scheduler should run in an independent goroutine.
results := make([]ScheduleResult, 0, len(m.registerSchedulers))
allResults := make([]ScheduleResult, 0, len(m.registerSchedulers))
for _, scheduler := range m.registerSchedulers {
result, err := scheduler.Schedule(ctx, clusterSnapshot)
if err != nil {
m.logger.Error("scheduler failed", zap.Error(err))
continue
}
results = append(results, result)
allResults = append(allResults, result)
}
return results
return allResults
}

func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) {
Expand Down
31 changes: 22 additions & 9 deletions server/coordinator/scheduler/static_topology_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package scheduler
import (
"context"
"fmt"
"strings"
"time"

"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
)
Expand All @@ -23,6 +25,9 @@ func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker co
}

func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
var procedures []procedure.Procedure
var reasons strings.Builder

switch clusterSnapshot.Topology.ClusterView.State {
case storage.ClusterStateEmpty:
return ScheduleResult{}, nil
Expand All @@ -47,10 +52,8 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
if err != nil {
return ScheduleResult{}, err
}
return ScheduleResult{
Procedure: p,
Reason: fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s", shardView.ShardID, newLeaderNode.Node.Name),
}, nil
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name))
}
case storage.ClusterStateStable:
for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ {
Expand All @@ -70,15 +73,25 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
if err != nil {
return ScheduleResult{}, err
}
return ScheduleResult{
Procedure: p,
Reason: fmt.Sprintf("Cluster state is stable, shard:%d is reopened in node:%s", shardNode.ID, node.Node.Name),
}, nil
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name))
}
}
}

return ScheduleResult{}, nil
if len(procedures) == 0 {
return ScheduleResult{}, nil
}

batchProcedure, err := s.factory.CreateBatchProcedure(ctx, coordinator.BatchRequest{
Batch: procedures,
BatchType: procedure.TransferLeader,
})
if err != nil {
return ScheduleResult{}, err
}

return ScheduleResult{batchProcedure, reasons.String()}, nil
}

func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) {
Expand Down
Loading

0 comments on commit de5607e

Please sign in to comment.