From 3e3cc45aac882cf61ae7a47a645bffaa2a0d77d3 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Tue, 13 Jun 2023 15:52:25 +0800 Subject: [PATCH] refactor: support open shard concurrently --- server/coordinator/factory.go | 14 ++ server/coordinator/procedure/error.go | 2 + .../transferleader/batch_transfer_leader.go | 130 ++++++++++++++++++ .../batch_transfer_leader_test.go | 118 ++++++++++++++++ .../scheduler/assign_shard_scheduler.go | 34 +++-- .../scheduler/assign_shard_scheduler_test.go | 6 +- .../scheduler/rebalanced_shard_scheduler.go | 24 +++- .../rebalanced_shard_scheduler_test.go | 6 +- .../scheduler/scheduler_manager.go | 6 +- .../static_topology_shard_scheduler.go | 31 +++-- .../static_topology_shard_scheduler_test.go | 6 +- 11 files changed, 342 insertions(+), 35 deletions(-) create mode 100644 server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go create mode 100644 server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 8e46d870..c5f0ec07 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -81,6 +81,11 @@ type CreatePartitionTableRequest struct { OnFailed func(error) error } +type BatchRequest struct { + Batch []procedure.Procedure + BatchType procedure.Typ +} + func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory { return &Factory{ idAllocator: allocator, @@ -253,6 +258,15 @@ func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest ) } +func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) { + id, err := f.allocProcedureID(ctx) + if err != nil { + return nil, err + } + + return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch) +} + func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) { id, err := f.idAllocator.Alloc(ctx) if err != nil { diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index 44c2f5bb..29a8864f 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -22,4 +22,6 @@ var ( ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data") ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure") ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough") + ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty") + ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch") ) diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go new file mode 100644 index 00000000..4aa4f54f --- /dev/null +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go @@ -0,0 +1,130 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +package transferleader + +import ( + "context" + "fmt" + "sync" + + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// BatchTransferLeaderProcedure is a proxy procedure contains a batch of TransferLeaderProcedure. +// It is used to support concurrent execution of a batch of TransferLeaderProcedure with same version. +type BatchTransferLeaderProcedure struct { + id uint64 + batch []procedure.Procedure + relatedVersionInfo procedure.RelatedVersionInfo + + // Protect the state. + lock sync.RWMutex + state procedure.State +} + +func NewBatchTransferLeaderProcedure(id uint64, batch []procedure.Procedure) (procedure.Procedure, error) { + if len(batch) == 0 { + return nil, procedure.ErrEmptyBatchProcedure + } + + relateVersionInfo, err := buildBatchRelatedVersionInfo(batch) + if err != nil { + return nil, err + } + + return &BatchTransferLeaderProcedure{id: id, batch: batch, state: procedure.StateInit, relatedVersionInfo: relateVersionInfo}, nil +} + +func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.RelatedVersionInfo, error) { + if len(batch) == 0 { + return procedure.RelatedVersionInfo{}, nil + } + + result := procedure.RelatedVersionInfo{ + ClusterID: batch[0].RelatedVersionInfo().ClusterID, + ShardWithVersion: map[storage.ShardID]uint64{}, + ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion, + } + + // The version of this batch of procedures must be the same. + for _, p := range batch { + if p.RelatedVersionInfo().ClusterID != result.ClusterID { + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent") + } + if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion { + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent") + } + // The ShardVersion of the same shard must be consistent. + for shardID, version := range p.RelatedVersionInfo().ShardWithVersion { + if resultVersion, exists := result.ShardWithVersion[shardID]; exists { + if version != resultVersion { + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent") + } + } else { + result.ShardWithVersion[shardID] = version + } + } + } + + return result, nil +} + +func (p *BatchTransferLeaderProcedure) ID() uint64 { + return p.id +} + +func (p *BatchTransferLeaderProcedure) Typ() procedure.Typ { + return procedure.TransferLeader +} + +func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error { + // Start procedures with multiple goroutine. + g, _ := errgroup.WithContext(ctx) + for _, p := range p.batch { + p := p + g.Go(func() error { + err := p.Start(ctx) + if err != nil { + log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p))) + } + return err + }) + } + + if err := g.Wait(); err != nil { + p.updateStateWithLock(procedure.StateFailed) + return err + } + + p.updateStateWithLock(procedure.StateFinished) + return nil +} + +func (p *BatchTransferLeaderProcedure) Cancel(_ context.Context) error { + p.updateStateWithLock(procedure.StateCancelled) + return nil +} + +func (p *BatchTransferLeaderProcedure) State() procedure.State { + return p.state +} + +func (p *BatchTransferLeaderProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + return p.relatedVersionInfo +} + +func (p *BatchTransferLeaderProcedure) Priority() procedure.Priority { + return p.batch[0].Priority() +} + +func (p *BatchTransferLeaderProcedure) updateStateWithLock(state procedure.State) { + p.lock.Lock() + defer p.lock.Unlock() + + p.state = state +} diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go new file mode 100644 index 00000000..ba72b03b --- /dev/null +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go @@ -0,0 +1,118 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +package transferleader_test + +import ( + "context" + "testing" + + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/stretchr/testify/require" +) + +type mockProcedure struct { + ClusterID storage.ClusterID + clusterVersion uint64 + typ procedure.Typ + ShardWithVersion map[storage.ShardID]uint64 +} + +func (m mockProcedure) ID() uint64 { + return 0 +} + +func (m mockProcedure) Typ() procedure.Typ { + return m.typ +} + +func (m mockProcedure) Start(_ context.Context) error { + return nil +} + +func (m mockProcedure) Cancel(_ context.Context) error { + return nil +} + +func (m mockProcedure) State() procedure.State { + return procedure.StateInit +} + +func (m mockProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + return procedure.RelatedVersionInfo{ + ClusterID: m.ClusterID, + ShardWithVersion: m.ShardWithVersion, + ClusterVersion: m.clusterVersion, + } +} + +func (m mockProcedure) Priority() procedure.Priority { + return procedure.PriorityLow +} + +func TestBatchProcedure(t *testing.T) { + re := require.New(t) + + var procedures []procedure.Procedure + + // Procedures with same type and version. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(i)] = 0 + p := mockProcedure{ + ClusterID: 0, + clusterVersion: 0, + typ: 0, + ShardWithVersion: shardWithVersion, + } + procedures = append(procedures, p) + } + _, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.NoError(err) + + // Procedure with different clusterID + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(i)] = 0 + p := mockProcedure{ + ClusterID: storage.ClusterID(i), + clusterVersion: 0, + typ: procedure.TransferLeader, + ShardWithVersion: shardWithVersion, + } + procedures = append(procedures, p) + } + _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.Error(err) + + // Procedures with different type. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(i)] = 0 + p := mockProcedure{ + ClusterID: 0, + clusterVersion: 0, + typ: procedure.Typ(i), + ShardWithVersion: shardWithVersion, + } + procedures = append(procedures, p) + } + _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.Error(err) + + // Procedures with different version. + for i := 0; i < 3; i++ { + shardWithVersion := map[storage.ShardID]uint64{} + shardWithVersion[storage.ShardID(0)] = uint64(i) + p := mockProcedure{ + ClusterID: 0, + clusterVersion: 0, + typ: procedure.Typ(i), + ShardWithVersion: shardWithVersion, + } + procedures = append(procedures, p) + } + _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) + re.Error(err) +} diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index dedba8ac..1856a643 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -4,16 +4,15 @@ package scheduler import ( "context" + "fmt" + "strings" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" ) -const ( - AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node" -) - // AssignShardScheduler used to assigning shards without nodes. type AssignShardScheduler struct { factory *coordinator.Factory @@ -32,6 +31,8 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta return ScheduleResult{}, nil } + var procedures []procedure.Procedure + var reasons strings.Builder // Check whether there is a shard without node mapping. for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping { _, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes) @@ -52,12 +53,27 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: AssignReason, - }, nil + + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name)) } - return ScheduleResult{}, nil + + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := a.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{ + batchProcedure, + reasons.String(), + }, nil } func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) { diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go index 7493ace8..c2269caa 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler_test.go +++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go @@ -25,17 +25,17 @@ func TestAssignSchedule(t *testing.T) { emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled an empty procedure. stableCluster := test.InitStableCluster(ctx, t) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index 7517273e..c4095747 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -5,9 +5,11 @@ package scheduler import ( "context" "fmt" + "strings" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "go.uber.org/zap" ) @@ -31,6 +33,8 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{}, nil } + var procedures []procedure.Procedure + var reasons strings.Builder // TODO: Improve scheduling efficiency and verify whether the topology changes. for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes { node, err := r.nodePicker.PickNode(ctx, shardNode.ID, clusterSnapshot.RegisteredNodes) @@ -48,12 +52,22 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s", shardNode.ID, shardNode.NodeName, node.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name)) } } - return ScheduleResult{}, nil + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := r.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{batchProcedure, reasons.String()}, nil } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go index 89c2f80c..d5014c64 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go @@ -25,16 +25,16 @@ func TestRebalancedScheduler(t *testing.T) { emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled an empty procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // StableCluster with all shards assigned would be scheduled a load balance procedure. stableCluster := test.InitStableCluster(ctx, t) - result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) + _, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) } diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index c3312c69..833d48a3 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler { func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. - results := make([]ScheduleResult, 0, len(m.registerSchedulers)) + allResults := make([]ScheduleResult, 0, len(m.registerSchedulers)) for _, scheduler := range m.registerSchedulers { result, err := scheduler.Schedule(ctx, clusterSnapshot) if err != nil { m.logger.Error("scheduler failed", zap.Error(err)) continue } - results = append(results, result) + allResults = append(allResults, result) } - return results + return allResults } func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index ceb67a3a..2b9105e8 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -5,10 +5,12 @@ package scheduler import ( "context" "fmt" + "strings" "time" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" ) @@ -23,6 +25,9 @@ func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker co } func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { + var procedures []procedure.Procedure + var reasons strings.Builder + switch clusterSnapshot.Topology.ClusterView.State { case storage.ClusterStateEmpty: return ScheduleResult{}, nil @@ -47,10 +52,8 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s", shardView.ShardID, newLeaderNode.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name)) } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -70,15 +73,25 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("Cluster state is stable, shard:%d is reopened in node:%s", shardNode.ID, node.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name)) } } } - return ScheduleResult{}, nil + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := s.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{batchProcedure, reasons.String()}, nil } func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go index 153d5553..47018623 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go @@ -25,17 +25,17 @@ func TestStaticTopologyScheduler(t *testing.T) { emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule. stableCluster := test.InitStableCluster(ctx, t) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) }