Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor by cr
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jul 3, 2023
1 parent d978368 commit 8da5749
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 8 deletions.
1 change: 1 addition & 0 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error {
return nil
}

// TODO: Filter duplicate submitted Procedure.
func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error {
if err := m.waitingProcedures.Push(procedure, 0); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.Relate
for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
if version != resultVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent")
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID: %d, expetcdShardVersion: %d, shardVersion: %d", shardID, version, resultVersion))
}
} else {
result.ShardWithVersion[shardID] = version
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
}

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d , node:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
}

if len(procedures) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s. ", shardNode.ID, shardNode.NodeName, node.Node.Name))
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/coordinator/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler {

func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult {
// TODO: Every scheduler should run in an independent goroutine.
allResults := make([]ScheduleResult, 0, len(m.registerSchedulers))
results := make([]ScheduleResult, 0, len(m.registerSchedulers))
for _, scheduler := range m.registerSchedulers {
result, err := scheduler.Schedule(ctx, clusterSnapshot)
if err != nil {
m.logger.Error("scheduler failed", zap.Error(err))
continue
}
allResults = append(allResults, result)
results = append(results, result)
}
return allResults
return results
}

func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
}
case storage.ClusterStateStable:
for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ {
Expand All @@ -74,7 +74,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name))
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s. ", shardNode.ID, node.Node.Name))
}
}
}
Expand Down

0 comments on commit 8da5749

Please sign in to comment.