Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor by cr
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Jul 4, 2023
1 parent d978368 commit 3f49fb8
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 34 deletions.
1 change: 1 addition & 0 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error {
return nil
}

// TODO: Filter duplicate submitted Procedure.
func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error {
if err := m.waitingProcedures.Push(procedure, 0); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.Relate
for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
if version != resultVersion {
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent")
return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID: %d, expetcdShardVersion: %d, shardVersion: %d", shardID, version, resultVersion))
}
} else {
result.ShardWithVersion[shardID] = version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,23 @@ func (m mockProcedure) Priority() procedure.Priority {

func TestBatchProcedure(t *testing.T) {
re := require.New(t)

var procedures []procedure.Procedure

// Procedures with same type and version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: 0,
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion)
procedures = append(procedures, p)
}
_, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.NoError(err)

// Procedure with different clusterID
// Procedure with different clusterID.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: storage.ClusterID(i),
clusterVersion: 0,
typ: procedure.TransferLeader,
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
Expand All @@ -90,12 +79,7 @@ func TestBatchProcedure(t *testing.T) {
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
Expand All @@ -105,14 +89,18 @@ func TestBatchProcedure(t *testing.T) {
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(0)] = uint64(i)
p := mockProcedure{
ClusterID: 0,
clusterVersion: 0,
typ: procedure.Typ(i),
ShardWithVersion: shardWithVersion,
}
p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
}

func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure {
return mockProcedure{
ClusterID: clusterID,
clusterVersion: clusterVersion,
typ: typ,
ShardWithVersion: shardWithVersion,
}
}
2 changes: 1 addition & 1 deletion server/coordinator/scheduler/assign_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta
}

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name))
}

if len(procedures) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name))
reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name))
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/coordinator/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler {

func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult {
// TODO: Every scheduler should run in an independent goroutine.
allResults := make([]ScheduleResult, 0, len(m.registerSchedulers))
results := make([]ScheduleResult, 0, len(m.registerSchedulers))
for _, scheduler := range m.registerSchedulers {
result, err := scheduler.Schedule(ctx, clusterSnapshot)
if err != nil {
m.logger.Error("scheduler failed", zap.Error(err))
continue
}
allResults = append(allResults, result)
results = append(results, result)
}
return allResults
return results
}

func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name))
reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
}
case storage.ClusterStateStable:
for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ {
Expand All @@ -74,7 +74,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap
return ScheduleResult{}, err
}
procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name))
reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name))
}
}
}
Expand Down

0 comments on commit 3f49fb8

Please sign in to comment.