Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: support open shard concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jun 26, 2023
1 parent 741535c commit 3e3cc45
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 35 deletions.
14 changes: 14 additions & 0 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ type CreatePartitionTableRequest struct {
OnFailed func(error) error
}

type BatchRequest struct {
Batch []procedure.Procedure
BatchType procedure.Typ
}

func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
return &Factory{
idAllocator: allocator,
Expand Down Expand Up @@ -253,6 +258,15 @@ func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest
)
}

func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}

return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch)
}

func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions server/coordinator/procedure/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ var (
ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough")
ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty")
ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch")
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package transferleader

import (
"context"
"fmt"
"sync"

"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// BatchTransferLeaderProcedure is a proxy procedure contains a batch of TransferLeaderProcedure.
// It is used to support concurrent execution of a batch of TransferLeaderProcedure with same version.
type BatchTransferLeaderProcedure struct {
id uint64
batch []procedure.Procedure
relatedVersionInfo procedure.RelatedVersionInfo

// Protect the state.
lock sync.RWMutex
state procedure.State
}

func NewBatchTransferLeaderProcedure(id uint64, batch []procedure.Procedure) (procedure.Procedure, error) {
if len(batch) == 0 {
return nil, procedure.ErrEmptyBatchProcedure
}

relateVersionInfo, err := buildBatchRelatedVersionInfo(batch)
if err != nil {
return nil, err
}

return &BatchTransferLeaderProcedure{id: id, batch: batch, state: procedure.StateInit, relatedVersionInfo: relateVersionInfo}, nil
}

func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.RelatedVersionInfo, error) {
if len(batch) == 0 {
return procedure.RelatedVersionInfo{}, nil
}

result := procedure.RelatedVersionInfo{
ClusterID: batch[0].RelatedVersionInfo().ClusterID,
ShardWithVersion: map[storage.ShardID]uint64{},
ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion,
}

// The version of this batch of procedures must be the same.
for _, p := range batch {
if p.RelatedVersionInfo().ClusterID != result.ClusterID {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent")
}
if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent")
}
// The ShardVersion of the same shard must be consistent.
for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
if version != resultVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent")
}
} else {
result.ShardWithVersion[shardID] = version
}
}
}

return result, nil
}

func (p *BatchTransferLeaderProcedure) ID() uint64 {
return p.id
}

func (p *BatchTransferLeaderProcedure) Typ() procedure.Typ {
return procedure.TransferLeader
}

func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error {
// Start procedures with multiple goroutine.
g, _ := errgroup.WithContext(ctx)
for _, p := range p.batch {
p := p
g.Go(func() error {
err := p.Start(ctx)
if err != nil {
log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)))
}
return err
})
}

if err := g.Wait(); err != nil {
p.updateStateWithLock(procedure.StateFailed)
return err
}

p.updateStateWithLock(procedure.StateFinished)
return nil
}

func (p *BatchTransferLeaderProcedure) Cancel(_ context.Context) error {
p.updateStateWithLock(procedure.StateCancelled)
return nil
}

func (p *BatchTransferLeaderProcedure) State() procedure.State {
return p.state
}

func (p *BatchTransferLeaderProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
return p.relatedVersionInfo
}

func (p *BatchTransferLeaderProcedure) Priority() procedure.Priority {
return p.batch[0].Priority()
}

func (p *BatchTransferLeaderProcedure) updateStateWithLock(state procedure.State) {
p.lock.Lock()
defer p.lock.Unlock()

p.state = state
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package transferleader_test

import (
"context"
"testing"

"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
)

type mockProcedure struct {
ClusterID storage.ClusterID
clusterVersion uint64
typ procedure.Typ
ShardWithVersion map[storage.ShardID]uint64
}

func (m mockProcedure) ID() uint64 {
return 0
}

func (m mockProcedure) Typ() procedure.Typ {
return m.typ
}

func (m mockProcedure) Start(_ context.Context) error {
return nil
}

func (m mockProcedure) Cancel(_ context.Context) error {
return nil
}

func (m mockProcedure) State() procedure.State {
return procedure.StateInit
}

func (m mockProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
return procedure.RelatedVersionInfo{
ClusterID: m.ClusterID,
ShardWithVersion: m.ShardWithVersion,
ClusterVersion: m.clusterVersion,
}
}

func (m mockProcedure) Priority() procedure.Priority {
return procedure.PriorityLow
}

func TestBatchProcedure(t *testing.T) {
re := require.New(t)

var procedures []procedure.Procedure

// Procedures with same type and version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: 0,
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
}
_, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.NoError(err)

// Procedure with different clusterID
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: storage.ClusterID(i),
clusterVersion: 0,
typ: procedure.TransferLeader,
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)

// Procedures with different type.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)

// Procedures with different version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(0)] = uint64(i)
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
}
34 changes: 25 additions & 9 deletions server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ package scheduler

import (
"context"
"fmt"
"strings"

"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
)

const (
AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node"
)

// AssignShardScheduler used to assigning shards without nodes.
type AssignShardScheduler struct {
factory *coordinator.Factory
Expand All @@ -32,6 +31,8 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
return ScheduleResult{}, nil
}

var procedures []procedure.Procedure
var reasons strings.Builder
// Check whether there is a shard without node mapping.
for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping {
_, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes)
Expand All @@ -52,12 +53,27 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
if err != nil {
return ScheduleResult{}, err
}
return ScheduleResult{
Procedure: p,
Reason: AssignReason,
}, nil

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name))
}
return ScheduleResult{}, nil

if len(procedures) == 0 {
return ScheduleResult{}, nil
}

batchProcedure, err := a.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
Batch: procedures,
BatchType: procedure.TransferLeader,
})
if err != nil {
return ScheduleResult{}, err
}

return ScheduleResult{
batchProcedure,
reasons.String(),
}, nil
}

func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) {
Expand Down
6 changes: 3 additions & 3 deletions server/coordinator/scheduler/assign_shard_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ func TestAssignSchedule(t *testing.T) {
emptyCluster := test.InitEmptyCluster(ctx, t)
result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Nil(result.Procedure)
re.Empty(result)

// PrepareCluster would be scheduled a transfer leader procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.NotNil(result.Procedure)
re.NotEmpty(result)

// StableCluster with all shards assigned would be scheduled an empty procedure.
stableCluster := test.InitStableCluster(ctx, t)
result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Nil(result.Procedure)
re.Empty(result)
}
Loading

0 comments on commit 3e3cc45

Please sign in to comment.