Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
add deploy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Oct 7, 2023
1 parent 7095a6a commit b01e3cf
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 18 deletions.
7 changes: 7 additions & 0 deletions server/coordinator/scheduler/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package scheduler

import "github.com/CeresDB/ceresmeta/pkg/coderr"

var (
ErrInvalidTopologyType = coderr.NewCodeError(coderr.Internal, "invalid topology type")
)
69 changes: 53 additions & 16 deletions server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/CeresDB/ceresmeta/pkg/assert"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
Expand All @@ -20,18 +21,29 @@ type RebalancedShardScheduler struct {
factory *coordinator.Factory
nodePicker coordinator.NodePicker
procedureExecutingBatchSize uint32

// Mutex is used to protect following fields.
lock sync.Mutex
// latestShardNodeMapping is used to record last stable shard topology,
// when deployMode is true, rebalancedShardScheduler will recover cluster according to the topology.
latestShardNodeMapping map[storage.ShardID]metadata.RegisteredNode
deployMode bool
}

func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
return RebalancedShardScheduler{
return &RebalancedShardScheduler{
logger: logger,
factory: factory,
nodePicker: nodePicker,
procedureExecutingBatchSize: procedureExecutingBatchSize,
}
}

func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
func (r *RebalancedShardScheduler) UpdateDeployMode(_ context.Context, enable bool) {
r.updateDeployMode(enable)
}

func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
// RebalancedShardScheduler can only be scheduled when the cluster is not empty.
if clusterSnapshot.Topology.ClusterView.State == storage.ClusterStateEmpty {
return ScheduleResult{}, nil
Expand All @@ -45,20 +57,30 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
shardIDs = append(shardIDs, shardID)
}
numShards := uint32(len(clusterSnapshot.Topology.ShardViewsMapping))
shardNodeMapping, err := r.nodePicker.PickNode(ctx, shardIDs, numShards, clusterSnapshot.RegisteredNodes)
if err != nil {
return ScheduleResult{}, err

// ShardNodeMapping only update when deployMode is false.
if !r.deployMode {
newShardNodeMapping, err := r.nodePicker.PickNode(ctx, shardIDs, numShards, clusterSnapshot.RegisteredNodes)
if err != nil {
return ScheduleResult{}, err
}
r.updateShardNodeMapping(newShardNodeMapping)
}

// Generate assigned shards mapping and transfer leader if node is changed.
assignedShardIDs := make(map[storage.ShardID]struct{}, numShards)
for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes {
if len(procedures) >= int(r.procedureExecutingBatchSize) {
r.logger.Warn("procedure length reached procedure executing batch size", zap.Uint32("procedureExecutingBatchSize", r.procedureExecutingBatchSize))
break
}

// Mark the shard assigned.
assignedShardIDs[shardNode.ID] = struct{}{}

newLeaderNode, ok := shardNodeMapping[shardNode.ID]
newLeaderNode, ok := r.latestShardNodeMapping[shardNode.ID]
assert.Assert(ok)
if newLeaderNode.Node.Name != shardNode.NodeName {
r.logger.Info("rebalanced shard scheduler generates transfer leader procedure", zap.Uint64("shardID", uint64(shardNode.ID)), zap.String("originNode", shardNode.NodeName), zap.String("newNode", newLeaderNode.Node.Name))
r.logger.Info("rebalanced shard scheduler try to assign shard to another node", zap.Uint64("shardID", uint64(shardNode.ID)), zap.String("originNode", shardNode.NodeName), zap.String("newNode", newLeaderNode.Node.Name))
p, err := r.factory.CreateTransferLeaderProcedure(ctx, coordinator.TransferLeaderRequest{
Snapshot: clusterSnapshot,
ShardID: shardNode.ID,
Expand All @@ -68,21 +90,25 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot
if err != nil {
return ScheduleResult{}, err
}

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("shard is transferred to another node, shardID:%d, oldNode:%s, newNode:%s\n", shardNode.ID, shardNode.NodeName, newLeaderNode.Node.Name))
if len(procedures) >= int(r.procedureExecutingBatchSize) {
break
}
}
}

// Check whether the assigned shard needs to be reopened.
for id := uint32(0); id < numShards; id++ {
if len(procedures) >= int(r.procedureExecutingBatchSize) {
r.logger.Warn("procedure length reached procedure executing batch size", zap.Uint32("procedureExecutingBatchSize", r.procedureExecutingBatchSize))
break
}

shardID := storage.ShardID(id)
if _, assigned := assignedShardIDs[shardID]; !assigned {
node, ok := shardNodeMapping[shardID]
node, ok := r.latestShardNodeMapping[shardID]
assert.Assert(ok)

r.logger.Info("rebalanced shard scheduler generates transfer leader procedure (assign to node)", zap.Uint32("shardID", id), zap.String("node", node.Node.Name))
r.logger.Info("rebalanced shard scheduler try to assign unassigned shard to node", zap.Uint32("shardID", id), zap.String("node", node.Node.Name))
p, err := r.factory.CreateTransferLeaderProcedure(ctx, coordinator.TransferLeaderRequest{
Snapshot: clusterSnapshot,
ShardID: shardID,
Expand All @@ -95,9 +121,6 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot

procedures = append(procedures, p)
reasons.WriteString(fmt.Sprintf("shard is assigned to a node, shardID:%d, node:%s\n", shardID, node.Node.Name))
if len(procedures) >= int(r.procedureExecutingBatchSize) {
break
}
}
}

Expand All @@ -115,3 +138,17 @@ func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot

return ScheduleResult{batchProcedure, reasons.String()}, nil
}

func (r *RebalancedShardScheduler) updateShardNodeMapping(newShardNodeMapping map[storage.ShardID]metadata.RegisteredNode) {
r.lock.Lock()
defer r.lock.Unlock()

r.latestShardNodeMapping = newShardNodeMapping
}

func (r *RebalancedShardScheduler) updateDeployMode(deployMode bool) {
r.lock.Lock()
defer r.lock.Unlock()

r.deployMode = deployMode
}
4 changes: 4 additions & 0 deletions server/coordinator/scheduler/reopen_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func NewReopenShardScheduler(factory *coordinator.Factory, procedureExecutingBat
}
}

func (r ReopenShardScheduler) UpdateDeployMode(_ context.Context, _ bool) {
// ReopenShardScheduler do not need deployMode.
}

func (r ReopenShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
// ReopenShardScheduler can only be scheduled when the cluster is stable.
if !clusterSnapshot.Topology.IsStable() {
Expand Down
4 changes: 4 additions & 0 deletions server/coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ type ScheduleResult struct {
type Scheduler interface {
// Schedule will generate procedure based on current cluster snapshot, which will be submitted to ProcedureManager, and whether it is actually executed depends on the current state of ProcedureManager.
Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error)

// UpdateDeployMode is used to update deployMode for scheduler,
// DeployMode means that the cluster topology is locked and the mapping between shards and nodes cannot be changed.
UpdateDeployMode(ctx context.Context, enable bool)
}
45 changes: 43 additions & 2 deletions server/coordinator/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
)

const (
schedulerInterval = time.Second * 5
defaultHashReplicas = 50
schedulerInterval = time.Second * 5
)

// Manager used to manage schedulers, it will register all schedulers when it starts.
Expand All @@ -31,12 +30,21 @@ const (
type Manager interface {
ListScheduler() []Scheduler

GetScheduler() Scheduler

Start(ctx context.Context) error

Stop(ctx context.Context) error

UpdateEnableSchedule(ctx context.Context, enableSchedule bool)

// UpdateDeployMode can only be used in dynamic mode, it will throw error when topology type is static.
// when deploy mode is true, shard topology will not be updated, it is usually used in scenarios such as cluster deploy.
UpdateDeployMode(ctx context.Context, enable bool) error

// GetDeployMode can only be used in dynamic mode, it will throw error when topology type is static.
GetDeployMode(ctx context.Context) (bool, error)

// Scheduler will be called when received new heartbeat, every scheduler registered in schedulerManager will be called to generate procedures.
// Scheduler cloud be schedule with fix time interval or heartbeat.
Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult
Expand All @@ -59,6 +67,7 @@ type ManagerImpl struct {
enableSchedule bool
topologyType storage.TopologyType
procedureExecutingBatchSize uint32
deployMode bool
}

func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager {
Expand Down Expand Up @@ -215,6 +224,11 @@ func (m *ManagerImpl) ListScheduler() []Scheduler {
return m.registerSchedulers
}

func (m *ManagerImpl) GetScheduler() Scheduler {
//TODO implement me
panic("implement me")
}

func (m *ManagerImpl) Scheduler(ctx context.Context, clusterSnapshot metadata.Snapshot) []ScheduleResult {
// TODO: Every scheduler should run in an independent goroutine.
results := make([]ScheduleResult, 0, len(m.registerSchedulers))
Expand All @@ -236,3 +250,30 @@ func (m *ManagerImpl) UpdateEnableSchedule(_ context.Context, enableSchedule boo

m.logger.Info("scheduler manager update enableSchedule", zap.Bool("enableSchedule", enableSchedule))
}

func (m *ManagerImpl) UpdateDeployMode(ctx context.Context, enable bool) error {
m.lock.Lock()
defer m.lock.Unlock()

if m.topologyType != storage.TopologyTypeDynamic {
return errors.WithMessage(ErrInvalidTopologyType, "deploy mode could only update when topology type is dynamic")
}

m.deployMode = enable
for _, scheduler := range m.registerSchedulers {
scheduler.UpdateDeployMode(ctx, enable)
}

return nil
}

func (m *ManagerImpl) GetDeployMode(_ context.Context) (bool, error) {
m.lock.RLock()
defer m.lock.RUnlock()

if m.topologyType != storage.TopologyTypeDynamic {
return false, errors.WithMessage(ErrInvalidTopologyType, "deploy mode could only get when topology type is dynamic")
}

return m.deployMode, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker co
return StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize}
}

func (s StaticTopologyShardScheduler) UpdateDeployMode(_ context.Context, _ bool) {
// StaticTopologyShardScheduler do not need deployMode.
}

func (s StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
var procedures []procedure.Procedure
var reasons strings.Builder
Expand Down
48 changes: 48 additions & 0 deletions server/service/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (a *API) NewAPIRouter() *Router {
router.Put(fmt.Sprintf("/cluster/:%s", clusterNameParam), wrap(a.updateCluster, true, a.forwardClient))
router.Get(fmt.Sprintf("/cluster/:%s/procedure", clusterNameParam), wrap(a.listProcedures, true, a.forwardClient))
router.Post("/table/query", wrap(a.queryTable, true, a.forwardClient))
router.Get(fmt.Sprintf("/cluster/:%s/deployMode", clusterNameParam), wrap(a.getDeployMode, true, a.forwardClient))
router.Put(fmt.Sprintf("/cluster/:%s/deployMode", clusterNameParam), wrap(a.updateDeployMode, true, a.forwardClient))

// Register debug API.
router.GetWithoutPrefix("/debug/pprof/profile", pprof.Profile)
Expand Down Expand Up @@ -408,6 +410,52 @@ func (a *API) queryTable(r *http.Request) apiFuncResult {
return okResult(tables)
}

func (a *API) getDeployMode(r *http.Request) apiFuncResult {
ctx := r.Context()
clusterName := Param(ctx, clusterNameParam)
if len(clusterName) == 0 {
clusterName = config.DefaultClusterName
}

c, err := a.clusterManager.GetCluster(ctx, clusterName)
if err != nil {
return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error()))
}

deployMode, err := c.GetSchedulerManager().GetDeployMode(r.Context())
if err != nil {
return errResult(ErrGetDeployMode, err.Error())
}

return okResult(deployMode)
}

func (a *API) updateDeployMode(r *http.Request) apiFuncResult {
ctx := r.Context()
clusterName := Param(ctx, clusterNameParam)
if len(clusterName) == 0 {
clusterName = config.DefaultClusterName
}

c, err := a.clusterManager.GetCluster(ctx, clusterName)
if err != nil {
return errResult(ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", clusterName, err.Error()))
}

var req UpdateDeployModeRequest
err = json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return errResult(ErrParseRequest, err.Error())
}

err = c.GetSchedulerManager().UpdateDeployMode(r.Context(), req.Enable)
if err != nil {
return errResult(ErrUpdateDeployMode, err.Error())
}

return okResult(req.Enable)
}

func (a *API) diagnoseShards(req *http.Request) apiFuncResult {
ctx := req.Context()
clusterName := Param(ctx, clusterNameParam)
Expand Down
2 changes: 2 additions & 0 deletions server/service/http/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (
ErrHealthCheck = coderr.NewCodeError(coderr.Internal, "server health check")
ErrParseTopology = coderr.NewCodeError(coderr.Internal, "parse topology type")
ErrUpdateFlowLimiter = coderr.NewCodeError(coderr.Internal, "update flow limiter")
ErrGetDeployMode = coderr.NewCodeError(coderr.Internal, "get deploy mode")
ErrUpdateDeployMode = coderr.NewCodeError(coderr.Internal, "update deploy mode")
ErrAddLearner = coderr.NewCodeError(coderr.Internal, "add member as learner")
ErrListMembers = coderr.NewCodeError(coderr.Internal, "get member list")
ErrRemoveMembers = coderr.NewCodeError(coderr.Internal, "remove member")
Expand Down
4 changes: 4 additions & 0 deletions server/service/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ type UpdateFlowLimiterRequest struct {
Burst int `json:"burst"`
Enable bool `json:"enable"`
}

type UpdateDeployModeRequest struct {
Enable bool `json:"enable"`
}

0 comments on commit b01e3cf

Please sign in to comment.