diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go index ff37a82c..d60437d1 100644 --- a/server/coordinator/procedure/manager_impl.go +++ b/server/coordinator/procedure/manager_impl.go @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error { return nil } +// TODO: Filter duplicate submitted Procedure. func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error { if err := m.waitingProcedures.Push(procedure, 0); err != nil { return err diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go index 4aa4f54f..791ece36 100644 --- a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go @@ -63,7 +63,7 @@ func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.Relate for shardID, version := range p.RelatedVersionInfo().ShardWithVersion { if resultVersion, exists := result.ShardWithVersion[shardID]; exists { if version != resultVersion { - return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent") + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID: %d, expetcdShardVersion: %d, shardVersion: %d", shardID, version, resultVersion)) } } else { result.ShardWithVersion[shardID] = version diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index 1856a643..e609fc8e 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -55,7 +55,7 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name)) + reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d , node:%s. ", shardView.ShardID, newLeaderNode.Node.Name)) } if len(procedures) == 0 { diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index c4095747..dbe7dda9 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -53,7 +53,7 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name)) + reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s. ", shardNode.ID, shardNode.NodeName, node.Node.Name)) } } diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index 833d48a3..c3312c69 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler { func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. - allResults := make([]ScheduleResult, 0, len(m.registerSchedulers)) + results := make([]ScheduleResult, 0, len(m.registerSchedulers)) for _, scheduler := range m.registerSchedulers { result, err := scheduler.Schedule(ctx, clusterSnapshot) if err != nil { m.logger.Error("scheduler failed", zap.Error(err)) continue } - allResults = append(allResults, result) + results = append(results, result) } - return allResults + return results } func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index 2b9105e8..d8375e51 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -53,7 +53,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name)) + reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s. ", shardView.ShardID, newLeaderNode.Node.Name)) } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -74,7 +74,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name)) + reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s. ", shardNode.ID, node.Node.Name)) } } }