From bed0dbbcc7f4e0e6167d8d0484e0e7422be3539b Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Tue, 27 Jun 2023 11:04:45 +0800 Subject: [PATCH] refactor: refactor by cr --- server/coordinator/procedure/manager_impl.go | 1 + .../transferleader/batch_transfer_leader.go | 2 +- .../batch_transfer_leader_test.go | 40 +++++++------------ .../scheduler/assign_shard_scheduler.go | 2 +- .../scheduler/rebalanced_shard_scheduler.go | 2 +- .../scheduler/scheduler_manager.go | 6 +-- .../static_topology_shard_scheduler.go | 4 +- server/service/grpc/service.go | 2 +- 8 files changed, 24 insertions(+), 35 deletions(-) diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go index ff37a82c..d60437d1 100644 --- a/server/coordinator/procedure/manager_impl.go +++ b/server/coordinator/procedure/manager_impl.go @@ -75,6 +75,7 @@ func (m *ManagerImpl) Stop(ctx context.Context) error { return nil } +// TODO: Filter duplicate submitted Procedure. func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error { if err := m.waitingProcedures.Push(procedure, 0); err != nil { return err diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go index 4aa4f54f..791ece36 100644 --- a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go @@ -63,7 +63,7 @@ func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.Relate for shardID, version := range p.RelatedVersionInfo().ShardWithVersion { if resultVersion, exists := result.ShardWithVersion[shardID]; exists { if version != resultVersion { - return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure shardVersion in the same batch is inconsistent") + return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID: %d, expetcdShardVersion: %d, shardVersion: %d", shardID, version, resultVersion)) } } else { result.ShardWithVersion[shardID] = version diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go index ba72b03b..4838ccbd 100644 --- a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go +++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go @@ -53,34 +53,23 @@ func (m mockProcedure) Priority() procedure.Priority { func TestBatchProcedure(t *testing.T) { re := require.New(t) - var procedures []procedure.Procedure // Procedures with same type and version. for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(i)] = 0 - p := mockProcedure{ - ClusterID: 0, - clusterVersion: 0, - typ: 0, - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion) procedures = append(procedures, p) } _, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures) re.NoError(err) - // Procedure with different clusterID + // Procedure with different clusterID. for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(i)] = 0 - p := mockProcedure{ - ClusterID: storage.ClusterID(i), - clusterVersion: 0, - typ: procedure.TransferLeader, - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion) procedures = append(procedures, p) } _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) @@ -90,12 +79,7 @@ func TestBatchProcedure(t *testing.T) { for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(i)] = 0 - p := mockProcedure{ - ClusterID: 0, - clusterVersion: 0, - typ: procedure.Typ(i), - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion) procedures = append(procedures, p) } _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) @@ -105,14 +89,18 @@ func TestBatchProcedure(t *testing.T) { for i := 0; i < 3; i++ { shardWithVersion := map[storage.ShardID]uint64{} shardWithVersion[storage.ShardID(0)] = uint64(i) - p := mockProcedure{ - ClusterID: 0, - clusterVersion: 0, - typ: procedure.Typ(i), - ShardWithVersion: shardWithVersion, - } + p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion) procedures = append(procedures, p) } _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures) re.Error(err) } + +func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure { + return mockProcedure{ + ClusterID: clusterID, + clusterVersion: clusterVersion, + typ: typ, + ShardWithVersion: shardWithVersion, + } +} diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index 1856a643..049fdf9c 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go @@ -55,7 +55,7 @@ func (a AssignShardScheduler) Schedule(ctx context.Context, clusterSnapshot meta } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("the shard:%d is not assigned to any node, try to assign it to node:%s", shardView.ShardID, newLeaderNode.Node.Name)) + reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name)) } if len(procedures) == 0 { diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index c4095747..66f032f6 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -53,7 +53,7 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s \n", shardNode.ID, shardNode.NodeName, node.Node.Name)) + reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name)) } } diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index 833d48a3..c3312c69 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -213,16 +213,16 @@ func (m *ManagerImpl) ListScheduler() []Scheduler { func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. - allResults := make([]ScheduleResult, 0, len(m.registerSchedulers)) + results := make([]ScheduleResult, 0, len(m.registerSchedulers)) for _, scheduler := range m.registerSchedulers { result, err := scheduler.Schedule(ctx, clusterSnapshot) if err != nil { m.logger.Error("scheduler failed", zap.Error(err)) continue } - allResults = append(allResults, result) + results = append(results, result) } - return allResults + return results } func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule bool) { diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go index 2b9105e8..636f1a73 100644 --- a/server/coordinator/scheduler/static_topology_shard_scheduler.go +++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go @@ -53,7 +53,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardView.ShardID, newLeaderNode.Node.Name)) + reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name)) } case storage.ClusterStateStable: for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ { @@ -74,7 +74,7 @@ func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnap return ScheduleResult{}, err } procedures = append(procedures, p) - reasons.WriteString(fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s \n", shardNode.ID, node.Node.Name)) + reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name)) } } } diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 94945f1f..692ffd62 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -297,7 +297,7 @@ func (s *Service) RouteTables(ctx context.Context, req *metaservicepb.RouteTable return &metaservicepb.RouteTablesResponse{Header: responseHeader(err, "grpc routeTables")}, nil } - log.Info("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ","))) + log.Debug("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ","))) // Forward request to the leader. if ceresmetaClient != nil {