Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor shard picker (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang authored Apr 13, 2023
1 parent 7fc4b32 commit 93f830d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type rawData struct {
FsmState string
State procedure.State

snapShot metadata.Snapshot
snapshot metadata.Snapshot
ShardID storage.ShardID
OldLeaderNodeName string
NewLeaderNodeName string
Expand Down Expand Up @@ -301,7 +301,7 @@ func (p *Procedure) convertToMeta() (procedure.Meta, error) {
ID: p.params.ID,
FsmState: p.fsm.Current(),
ShardID: p.params.ShardID,
snapShot: p.params.ClusterSnapShot,
snapshot: p.params.ClusterSnapShot,
OldLeaderNodeName: p.params.OldLeaderNodeName,
NewLeaderNodeName: p.params.NewLeaderNodeName,
State: p.state,
Expand Down
53 changes: 23 additions & 30 deletions server/coordinator/shard_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"crypto/rand"
"math/big"

"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
)

Expand All @@ -18,59 +18,52 @@ import (
// If enableDuplicateNode is true, pick shard will return shards on the same node.
// TODO: Consider refactor this interface, abstracts the parameters of PickShards as PickStrategy.
type ShardPicker interface {
PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]metadata.ShardNodeWithVersion, error)
PickShards(ctx context.Context, snapshot metadata.Snapshot, expectShardNum int, enableDuplicateNode bool) ([]storage.ShardNode, error)
}

// RandomBalancedShardPicker randomly pick up shards that are not on the same node in the current cluster.
type RandomBalancedShardPicker struct {
clusterManager cluster.Manager
}
type RandomBalancedShardPicker struct{}

func NewRandomBalancedShardPicker(manager cluster.Manager) ShardPicker {
return &RandomBalancedShardPicker{
clusterManager: manager,
}
func NewRandomBalancedShardPicker() ShardPicker {
return &RandomBalancedShardPicker{}
}

// PickShards will pick a specified number of shards as expectShardNum.
func (p *RandomBalancedShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]metadata.ShardNodeWithVersion, error) {
getNodeShardResult, err := p.clusterManager.GetNodeShards(ctx, clusterName)
if err != nil {
return []metadata.ShardNodeWithVersion{}, errors.WithMessage(err, "get node shards")
}
func (p *RandomBalancedShardPicker) PickShards(_ context.Context, snapshot metadata.Snapshot, expectShardNum int, enableDuplicateNode bool) ([]storage.ShardNode, error) {
shardNodes := snapshot.Topology.ClusterView.ShardNodes

nodeShardsMapping := make(map[string][]metadata.ShardNodeWithVersion, 0)
for _, nodeShard := range getNodeShardResult.NodeShards {
_, exists := nodeShardsMapping[nodeShard.ShardNode.NodeName]
nodeShardsMapping := make(map[string][]storage.ShardNode, 0)
for _, shardNode := range shardNodes {
_, exists := nodeShardsMapping[shardNode.NodeName]
if !exists {
nodeShards := []metadata.ShardNodeWithVersion{}
nodeShardsMapping[nodeShard.ShardNode.NodeName] = nodeShards
shardNodes := []storage.ShardNode{}
nodeShardsMapping[shardNode.NodeName] = shardNodes
}
nodeShardsMapping[nodeShard.ShardNode.NodeName] = append(nodeShardsMapping[nodeShard.ShardNode.NodeName], nodeShard)
nodeShardsMapping[shardNode.NodeName] = append(nodeShardsMapping[shardNode.NodeName], shardNode)
}

if !enableDuplicateNode {
if len(nodeShardsMapping) < expectShardNum {
return []metadata.ShardNodeWithVersion{}, errors.WithMessagef(ErrNodeNumberNotEnough, "number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum)
return nil, errors.WithMessagef(ErrNodeNumberNotEnough, "number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum)
}
}

// Try to make shards on different nodes.
result := []metadata.ShardNodeWithVersion{}
totalShardLength := len(getNodeShardResult.NodeShards)
tempNodeShardMapping := make(map[string][]metadata.ShardNodeWithVersion, len(nodeShardsMapping))
result := make([]storage.ShardNode, 0, expectShardNum)
totalShardLength := len(shardNodes)
tempNodeShardMapping := make(map[string][]storage.ShardNode, len(nodeShardsMapping))
for {
nodeNames := []string{}
nodeNames := make([]string, 0, len(nodeShardsMapping))
for nodeName := range nodeShardsMapping {
nodeNames = append(nodeNames, nodeName)
}

// Reset node shards when shard is all picked.
if len(result)%totalShardLength == 0 {
for nodeName, nodeShard := range nodeShardsMapping {
tempNodeShard := make([]metadata.ShardNodeWithVersion, len(nodeShard))
copy(tempNodeShard, nodeShard)
tempNodeShardMapping[nodeName] = tempNodeShard
for nodeName, shardNode := range nodeShardsMapping {
tempShardNode := make([]storage.ShardNode, len(shardNode))
copy(tempShardNode, shardNode)
tempNodeShardMapping[nodeName] = tempShardNode
}
}

Expand All @@ -81,7 +74,7 @@ func (p *RandomBalancedShardPicker) PickShards(ctx context.Context, clusterName

selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
if err != nil {
return []metadata.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
return nil, errors.WithMessage(err, "generate random node index")
}

nodeShards := tempNodeShardMapping[nodeNames[selectNodeIndex.Int64()]]
Expand Down

0 comments on commit 93f830d

Please sign in to comment.