diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 8e46d870..df408484 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -81,6 +81,11 @@ type CreatePartitionTableRequest struct { OnFailed func(error) error } +type BatchRequest struct { + Batch []procedure.Procedure + BatchType procedure.Typ +} + func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory { return &Factory{ idAllocator: allocator, @@ -253,6 +258,15 @@ func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest ) } +func (f *Factory) CreateBatchProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) { + id, err := f.allocProcedureID(ctx) + if err != nil { + return nil, err + } + + return procedure.NewBatchProcedure(id, request.Batch, request.BatchType) +} + func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) { id, err := f.idAllocator.Alloc(ctx) if err != nil { diff --git a/server/coordinator/procedure/batch_procedure.go b/server/coordinator/procedure/batch_procedure.go new file mode 100644 index 00000000..e8b9a692 --- /dev/null +++ b/server/coordinator/procedure/batch_procedure.go @@ -0,0 +1,126 @@ +package procedure + +import ( + "context" + "fmt" + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "sync" +) + +type BatchProcedure struct { + id uint64 + typ Typ + batch []Procedure + relatedVersionInfo RelatedVersionInfo + + // Protect the state. + lock sync.RWMutex + state State +} + +func NewBatchProcedure(id uint64, batch []Procedure, typ Typ) (Procedure, error) { + if len(batch) == 0 { + return nil, ErrEmptyBatchProcedure + } + + relateVersionInfo, err := buildRelatedVersionInfo(batch, typ) + if err != nil { + return nil, err + } + + return &BatchProcedure{id: id, batch: batch, state: StateInit, typ: typ, relatedVersionInfo: relateVersionInfo}, nil +} + +func buildRelatedVersionInfo(batch []Procedure, typ Typ) (RelatedVersionInfo, error) { + if len(batch) == 0 { + return RelatedVersionInfo{}, nil + } + + result := RelatedVersionInfo{ + ClusterID: batch[0].RelatedVersionInfo().ClusterID, + ShardWithVersion: map[storage.ShardID]uint64{}, + ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion, + } + + // The version of this batch of procedures must be the same. + for _, p := range batch { + if p.Typ() != typ { + return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure type in the same batch is inconsistent") + } + if p.RelatedVersionInfo().ClusterID != result.ClusterID { + return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent") + } + if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion { + return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent") + } + // The ShardVersion of the same shard must be consistent. + for shardID, version := range p.RelatedVersionInfo().ShardWithVersion { + if resultVersion, exists := result.ShardWithVersion[shardID]; exists { + if version != resultVersion { + return RelatedVersionInfo{}, errors.WithMessage(ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent") + } + } else { + result.ShardWithVersion[shardID] = version + } + } + } + + return result, nil +} + +func (p *BatchProcedure) ID() uint64 { + return p.id +} + +func (p *BatchProcedure) Typ() Typ { + return p.typ +} + +func (p *BatchProcedure) Start(ctx context.Context) error { + // Start with multiple goroutine + g, _ := errgroup.WithContext(ctx) + for _, p := range p.batch { + p := p + g.Go(func() error { + err := p.Start(ctx) + if err != nil { + log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p))) + } + return err + }) + } + + if err := g.Wait(); err != nil { + return err + } + + return nil +} + +func (p *BatchProcedure) Cancel(_ context.Context) error { + p.updateStateWithLock(StateCancelled) + return nil +} + +func (p *BatchProcedure) State() State { + return p.state +} + +func (p *BatchProcedure) RelatedVersionInfo() RelatedVersionInfo { + return p.relatedVersionInfo +} + +func (p *BatchProcedure) Priority() Priority { + return p.batch[0].Priority() +} + +func (p *BatchProcedure) updateStateWithLock(state State) { + p.lock.Lock() + defer p.lock.Unlock() + + p.state = state +} diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index 44c2f5bb..29a8864f 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -22,4 +22,6 @@ var ( ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data") ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure") ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough") + ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty") + ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch") ) diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index dedba8ac..54e28fe5 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -4,16 +4,15 @@ package scheduler import ( "context" + "fmt" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "strings" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/storage" ) -const ( - AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node" -) - // AssignShardScheduler used to assigning shards without nodes. type AssignShardScheduler struct { factory *coordinator.Factory @@ -32,6 +31,8 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta return ScheduleResult{}, nil } + var procedures []procedure.Procedure + var reasons strings.Builder // Check whether there is a shard without node mapping. for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping { _, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes) @@ -52,12 +53,27 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: AssignReason, - }, nil + + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name)) } - return ScheduleResult{}, nil + + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := a.factory.CreateBatchProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{ + batchProcedure, + reasons.String(), + }, nil } func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) { diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go index 7493ace8..c2269caa 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler_test.go +++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go @@ -25,17 +25,17 @@ func TestAssignSchedule(t *testing.T) { emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled an empty procedure. stableCluster := test.InitStableCluster(ctx, t) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index 7517273e..83083629 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -5,6 +5,8 @@ package scheduler import ( "context" "fmt" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "strings" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" @@ -31,6 +33,8 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{}, nil } + var procedures []procedure.Procedure + var reasons strings.Builder // TODO: Improve scheduling efficiency and verify whether the topology changes. for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes { node, err := r.nodePicker.PickNode(ctx, shardNode.ID, clusterSnapshot.RegisteredNodes) @@ -48,12 +52,22 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s", shardNode.ID, shardNode.NodeName, node.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name)) } } - return ScheduleResult{}, nil + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := r.factory.CreateBatchProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{batchProcedure, reasons.String()}, nil } diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go index 89c2f80c..decba76d 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go @@ -25,13 +25,13 @@ func TestRebalancedScheduler(t *testing.T) { emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled an empty procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // StableCluster with all shards assigned would be scheduled a load balance procedure. stableCluster := test.InitStableCluster(ctx, t) diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index c3312c69..833d48a3 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler { func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. - results := make([]ScheduleResult, 0, len(m.registerSchedulers)) + allResults := make([]ScheduleResult, 0, len(m.registerSchedulers)) for _, scheduler := range m.registerSchedulers { result, err := scheduler.Schedule(ctx, clusterSnapshot) if err != nil { m.logger.Error("scheduler failed", zap.Error(err)) continue } - results = append(results, result) + allResults = append(allResults, result) } - return results + return allResults } func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index ceb67a3a..0f15039f 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -5,6 +5,8 @@ package scheduler import ( "context" "fmt" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "strings" "time" "github.com/CeresDB/ceresmeta/server/cluster/metadata" @@ -23,6 +25,9 @@ func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker co } func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { + var procedures []procedure.Procedure + var reasons strings.Builder + switch clusterSnapshot.Topology.ClusterView.State { case storage.ClusterStateEmpty: return ScheduleResult{}, nil @@ -47,10 +52,8 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s", shardView.ShardID, newLeaderNode.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name)) } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -70,15 +73,25 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap if err != nil { return ScheduleResult{}, err } - return ScheduleResult{ - Procedure: p, - Reason: fmt.Sprintf("Cluster state is stable, shard:%d is reopened in node:%s", shardNode.ID, node.Node.Name), - }, nil + procedures = append(procedures, p) + reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name)) } } } - return ScheduleResult{}, nil + if len(procedures) == 0 { + return ScheduleResult{}, nil + } + + batchProcedure, err := s.factory.CreateBatchProcedure(ctx, coordinator.BatchRequest{ + Batch: procedures, + BatchType: procedure.TransferLeader, + }) + if err != nil { + return ScheduleResult{}, err + } + + return ScheduleResult{batchProcedure, reasons.String()}, nil } func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go index 153d5553..47018623 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go @@ -25,17 +25,17 @@ func TestStaticTopologyScheduler(t *testing.T) { emptyCluster := test.InitEmptyCluster(ctx, t) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.Nil(result.Procedure) + re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule. stableCluster := test.InitStableCluster(ctx, t) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) - re.NotNil(result.Procedure) + re.NotEmpty(result) }