diff --git a/server/coordinator/scheduler/error.go b/server/coordinator/scheduler/error.go new file mode 100644 index 00000000..a2be2aef --- /dev/null +++ b/server/coordinator/scheduler/error.go @@ -0,0 +1,7 @@ +package scheduler + +import "github.com/CeresDB/ceresmeta/pkg/coderr" + +var ( + ErrInvalidTopologyType = coderr.NewCodeError(coderr.Internal, "invalid topology type") +) diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go index 7b4b05df..3d5999c1 100644 --- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go +++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "strings" + "sync" "github.com/CeresDB/ceresmeta/pkg/assert" "github.com/CeresDB/ceresmeta/server/cluster/metadata" @@ -20,10 +21,17 @@ type RebalancedShardScheduler struct { factory *coordinator.Factory nodePicker coordinator.NodePicker procedureExecutingBatchSize uint32 + + // Mutex is used to protect following fields. + lock sync.Mutex + // latestShardNodeMapping is used to record last stable shard topology, + // when deployMode is true, rebalancedShardScheduler will recover cluster according to the topology. + latestShardNodeMapping map[storage.ShardID]metadata.RegisteredNode + deployMode bool } func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler { - return RebalancedShardScheduler{ + return &RebalancedShardScheduler{ logger: logger, factory: factory, nodePicker: nodePicker, @@ -31,7 +39,7 @@ func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factor } } -func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { +func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) { // RebalancedShardScheduler can only be scheduled when the cluster is not empty. if clusterSnapshot.Topology.ClusterView.State == storage.ClusterStateEmpty { return ScheduleResult{}, nil @@ -45,20 +53,30 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot shardIDs = append(shardIDs, shardID) } numShards := uint32(len(clusterSnapshot.Topology.ShardViewsMapping)) - shardNodeMapping, err := r.nodePicker.PickNode(ctx, shardIDs, numShards, clusterSnapshot.RegisteredNodes) - if err != nil { - return ScheduleResult{}, err + + // ShardNodeMapping only update when deployMode is false. + if !r.deployMode { + newShardNodeMapping, err := r.nodePicker.PickNode(ctx, shardIDs, numShards, clusterSnapshot.RegisteredNodes) + if err != nil { + return ScheduleResult{}, err + } + r.updateShardNodeMapping(newShardNodeMapping) } + // Generate assigned shards mapping and transfer leader if node is changed. assignedShardIDs := make(map[storage.ShardID]struct{}, numShards) for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes { + if len(procedures) >= int(r.procedureExecutingBatchSize) { + r.logger.Warn("procedure length reached procedure executing batch size", zap.Uint32("procedureExecutingBatchSize", r.procedureExecutingBatchSize)) + break + } + // Mark the shard assigned. assignedShardIDs[shardNode.ID] = struct{}{} - - newLeaderNode, ok := shardNodeMapping[shardNode.ID] + newLeaderNode, ok := r.latestShardNodeMapping[shardNode.ID] assert.Assert(ok) if newLeaderNode.Node.Name != shardNode.NodeName { - r.logger.Info("rebalanced shard scheduler generates transfer leader procedure", zap.Uint64("shardID", uint64(shardNode.ID)), zap.String("originNode", shardNode.NodeName), zap.String("newNode", newLeaderNode.Node.Name)) + r.logger.Info("rebalanced shard scheduler try to assign shard to another node", zap.Uint64("shardID", uint64(shardNode.ID)), zap.String("originNode", shardNode.NodeName), zap.String("newNode", newLeaderNode.Node.Name)) p, err := r.factory.CreateTransferLeaderProcedure(ctx, coordinator.TransferLeaderRequest{ Snapshot: clusterSnapshot, ShardID: shardNode.ID, @@ -68,21 +86,25 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot if err != nil { return ScheduleResult{}, err } + procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("shard is transferred to another node, shardID:%d, oldNode:%s, newNode:%s\n", shardNode.ID, shardNode.NodeName, newLeaderNode.Node.Name)) - if len(procedures) >= int(r.procedureExecutingBatchSize) { - break - } } } + // Check whether the assigned shard needs to be reopened. for id := uint32(0); id < numShards; id++ { + if len(procedures) >= int(r.procedureExecutingBatchSize) { + r.logger.Warn("procedure length reached procedure executing batch size", zap.Uint32("procedureExecutingBatchSize", r.procedureExecutingBatchSize)) + break + } + shardID := storage.ShardID(id) if _, assigned := assignedShardIDs[shardID]; !assigned { - node, ok := shardNodeMapping[shardID] + node, ok := r.latestShardNodeMapping[shardID] assert.Assert(ok) - r.logger.Info("rebalanced shard scheduler generates transfer leader procedure (assign to node)", zap.Uint32("shardID", id), zap.String("node", node.Node.Name)) + r.logger.Info("rebalanced shard scheduler try to assign unassigned shard to node", zap.Uint32("shardID", id), zap.String("node", node.Node.Name)) p, err := r.factory.CreateTransferLeaderProcedure(ctx, coordinator.TransferLeaderRequest{ Snapshot: clusterSnapshot, ShardID: shardID, @@ -95,9 +117,6 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot procedures = append(procedures, p) reasons.WriteString(fmt.Sprintf("shard is assigned to a node, shardID:%d, node:%s\n", shardID, node.Node.Name)) - if len(procedures) >= int(r.procedureExecutingBatchSize) { - break - } } } @@ -115,3 +134,17 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot return ScheduleResult{batchProcedure, reasons.String()}, nil } + +func (r *RebalancedShardScheduler) updateShardNodeMapping(newShardNodeMapping map[storage.ShardID]metadata.RegisteredNode) { + r.lock.Lock() + defer r.lock.Unlock() + + r.latestShardNodeMapping = newShardNodeMapping +} + +func (r *RebalancedShardScheduler) updateDeployMode(deployMode bool) { + r.lock.Lock() + defer r.lock.Unlock() + + r.deployMode = deployMode +} diff --git a/server/coordinator/scheduler/scheduler.go b/server/coordinator/scheduler/scheduler.go index 4f1aeff5..eee9ca71 100644 --- a/server/coordinator/scheduler/scheduler.go +++ b/server/coordinator/scheduler/scheduler.go @@ -18,4 +18,8 @@ type ScheduleResult struct { type Scheduler interface { // Schedule will generate procedure based on current cluster snapshot, which will be submitted to ProcedureManager, and whether it is actually executed depends on the current state of ProcedureManager. Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) + + // UpdateDeployMode is used to update deployMode for scheduler, + // DeployMode means that the cluster topology is locked and the mapping between shards and nodes cannot be changed. + UpdateDeployMode(ctx context.Context, enable bool) } diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go index 2b34f15b..ce2b31b5 100644 --- a/server/coordinator/scheduler/scheduler_manager.go +++ b/server/coordinator/scheduler/scheduler_manager.go @@ -21,8 +21,7 @@ import ( ) const ( - schedulerInterval = time.Second * 5 - defaultHashReplicas = 50 + schedulerInterval = time.Second * 5 ) // Manager used to manage schedulers, it will register all schedulers when it starts. @@ -31,12 +30,21 @@ const ( type Manager interface { ListScheduler() []Scheduler + GetScheduler() Scheduler + Start(ctx context.Context) error Stop(ctx context.Context) error UpdateEnableSchedule(ctx context.Context, enableSchedule bool) + // UpdateDeployMode can only be used in dynamic mode, it will throw error when topology type is static. + // when deploy mode is true, shard topology will not be updated, it is usually used in scenarios such as cluster deploy. + UpdateDeployMode(ctx context.Context, enable bool) error + + // GetDeployMode can only be used in dynamic mode, it will throw error when topology type is static. + GetDeployMode(ctx context.Context) (bool, error) + // Scheduler will be called when received new heartbeat, every scheduler registered in schedulerManager will be called to generate procedures. // Scheduler cloud be schedule with fix time interval or heartbeat. Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult @@ -59,6 +67,7 @@ type ManagerImpl struct { enableSchedule bool topologyType storage.TopologyType procedureExecutingBatchSize uint32 + deployMode bool } func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager { @@ -215,6 +224,11 @@ func (m *ManagerImpl) ListScheduler() []Scheduler { return m.registerSchedulers } +func (m *ManagerImpl) GetScheduler() Scheduler { + //TODO implement me + panic("implement me") +} + func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult { // TODO: Every scheduler should run in an independent goroutine. results := make([]ScheduleResult, 0, len(m.registerSchedulers)) @@ -236,3 +250,30 @@ func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule boo m.logger.Info("scheduler manager update enableSchedule", zap.Bool("enableSchedule", enableSchedule)) } + +func (m *ManagerImpl) UpdateDeployMode(ctx context.Context, enable bool) error { + m.lock.Lock() + defer m.lock.Unlock() + + if m.topologyType != storage.TopologyTypeDynamic { + return errors.WithMessage(ErrInvalidTopologyType, "deploy mode could only update when topology type is dynamic") + } + + m.deployMode = enable + for _, scheduler := range m.registerSchedulers { + scheduler.UpdateDeployMode(ctx, enable) + } + + return nil +} + +func (m *ManagerImpl) GetDeployMode(_ context.Context) (bool, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + if m.topologyType != storage.TopologyTypeDynamic { + return false, errors.WithMessage(ErrInvalidTopologyType, "deploy mode could only get when topology type is dynamic") + } + + return m.deployMode, nil +} diff --git a/server/service/http/api.go b/server/service/http/api.go index ac3f3b0b..53cd211c 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -56,6 +56,8 @@ func (a *API) NewAPIRouter() *Router { router.Put(fmt.Sprintf("/cluster/:%s", clusterNameParam), wrap(a.updateCluster, true, a.forwardClient)) router.Get(fmt.Sprintf("/cluster/:%s/procedure", clusterNameParam), wrap(a.listProcedures, true, a.forwardClient)) router.Post("/table/query", wrap(a.queryTable, true, a.forwardClient)) + router.Get(fmt.Sprintf("/cluster/:%s/deployMode", clusterNameParam), wrap(a.getDeployMode, true, a.forwardClient)) + router.Put(fmt.Sprintf("/cluster/:%s/deployMode", clusterNameParam), wrap(a.updateDeployMode, true, a.forwardClient)) // Register debug API. router.GetWithoutPrefix("/debug/pprof/profile", pprof.Profile) @@ -408,6 +410,52 @@ func (a *API) queryTable(r *http.Request) apiFuncResult { return okResult(tables) } +func (a *API) getDeployMode(r *http.Request) apiFuncResult { + ctx := r.Context() + clusterName := Param(ctx, clusterNameParam) + if len(clusterName) == 0 { + clusterName = config.DefaultClusterName + } + + c, err := a.clusterManager.GetCluster(ctx, clusterName) + if err != nil { + return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error())) + } + + deployMode, err := c.GetSchedulerManager().GetDeployMode(r.Context()) + if err != nil { + return errResult(ErrGetDeployMode, err.Error()) + } + + return okResult(deployMode) +} + +func (a *API) updateDeployMode(r *http.Request) apiFuncResult { + ctx := r.Context() + clusterName := Param(ctx, clusterNameParam) + if len(clusterName) == 0 { + clusterName = config.DefaultClusterName + } + + c, err := a.clusterManager.GetCluster(ctx, clusterName) + if err != nil { + return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error())) + } + + var req UpdateDeployModeRequest + err = json.NewDecoder(r.Body).Decode(&req) + if err != nil { + return errResult(ErrParseRequest, err.Error()) + } + + err = c.GetSchedulerManager().UpdateDeployMode(r.Context(), req.Enable) + if err != nil { + return errResult(ErrUpdateDeployMode, err.Error()) + } + + return okResult(req.Enable) +} + func (a *API) diagnoseShards(req *http.Request) apiFuncResult { ctx := req.Context() clusterName := Param(ctx, clusterNameParam) diff --git a/server/service/http/error.go b/server/service/http/error.go index 94e16578..c60e9fd1 100644 --- a/server/service/http/error.go +++ b/server/service/http/error.go @@ -18,6 +18,8 @@ var ( ErrHealthCheck = coderr.NewCodeError(coderr.Internal, "server health check") ErrParseTopology = coderr.NewCodeError(coderr.Internal, "parse topology type") ErrUpdateFlowLimiter = coderr.NewCodeError(coderr.Internal, "update flow limiter") + ErrGetDeployMode = coderr.NewCodeError(coderr.Internal, "get deploy mode") + ErrUpdateDeployMode = coderr.NewCodeError(coderr.Internal, "update deploy mode") ErrAddLearner = coderr.NewCodeError(coderr.Internal, "add member as learner") ErrListMembers = coderr.NewCodeError(coderr.Internal, "get member list") ErrRemoveMembers = coderr.NewCodeError(coderr.Internal, "remove member") diff --git a/server/service/http/types.go b/server/service/http/types.go index f2a8497e..a352839c 100644 --- a/server/service/http/types.go +++ b/server/service/http/types.go @@ -137,3 +137,7 @@ type UpdateFlowLimiterRequest struct { Burst int `json:"burst"` Enable bool `json:"enable"` } + +type UpdateDeployModeRequest struct { + Enable bool `json:"enable"` +}