Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor by cr
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jul 4, 2023
1 parent 8da5749 commit 8980f4b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,66 +53,46 @@ func (m mockProcedure) Priority() procedure.Priority {

func TestBatchProcedure(t *testing.T) {
re := require.New(t)

var procedures []procedure.Procedure

// Procedures with same type and version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: 0,
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
procedures = createMockProcedure(i, 0, 0, 0, procedures)
}
_, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.NoError(err)

// Procedure with different clusterID
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: storage.ClusterID(i),
clusterVersion: 0,
typ: procedure.TransferLeader,
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
createMockProcedure(i, uint64(i), 0, procedure.TransferLeader, procedures)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)

// Procedures with different type.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
createMockProcedure(i, 0, 0, procedure.Typ(i), procedures)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)

// Procedures with different version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(0)] = uint64(i)
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
createMockProcedure(0, 0, 0, procedure.Typ(i), procedures)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
}

func createMockProcedure(shardID int, clusterID uint64, clusterVersion uint64, typ procedure.Typ, procedures []procedure.Procedure) []procedure.Procedure {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(shardID)] = 0
p := mockProcedure{
ClusterID: storage.ClusterID(clusterID),
clusterVersion: clusterVersion,
typ: typ,
ShardWithVersion: shardWithVersion,
}
procedures = append(procedures, p)
return procedures
}
2 changes: 1 addition & 1 deletion server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
}

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d , node:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name))
}

if len(procedures) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s. ", shardNode.ID, shardNode.NodeName, node.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
}
case storage.ClusterStateStable:
for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ {
Expand All @@ -74,7 +74,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s. ", shardNode.ID, node.Node.Name))
reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name))
}
}
}
Expand Down

0 comments on commit 8980f4b

Please sign in to comment.