Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

refactor: make create table idempotent #286

Merged
merged 13 commits into from
Jan 23, 2024
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client *
dispatch := eventdispatch.NewDispatchImpl()

procedureIDRootPath := strings.Join([]string{rootPath, metadata.Name(), defaultProcedurePrefixKey}, "/")
procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), dispatch, procedureStorage)
procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), dispatch, procedureStorage, metadata)

schedulerManager := manager.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())

Expand Down
33 changes: 32 additions & 1 deletion server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (c *ClusterMetadata) Load(ctx context.Context) error {
return errors.WithMessage(err, "load table manager")
}

if err := c.topologyManager.Load(ctx); err != nil {
schemas := c.tableManager.GetSchemas()
if err := c.topologyManager.Load(ctx, schemas); err != nil {
return errors.WithMessage(err, "load topology manager")
}

Expand Down Expand Up @@ -279,6 +280,11 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName string) (storage.Table,
return c.tableManager.GetTable(schemaName, tableName)
}

// GetTableShard get the shard where the table actually exists.
func (c *ClusterMetadata) GetTableShard(ctx context.Context, table storage.Table) (storage.ShardID, bool) {
return c.topologyManager.GetTableShardID(ctx, table)
}

func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request CreateTableMetadataRequest) (CreateTableMetadataResult, error) {
c.logger.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

Expand Down Expand Up @@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe
return ret, nil
}

func (c *ClusterMetadata) GetTableAssignedShard(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool, error) {
schema, exists := c.tableManager.GetSchema(schemaName)
if !exists {
return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName)
}
shardIDs, exists := c.topologyManager.GetTableAssignedShard(ctx, schema.ID, tableName)
return shardIDs, exists, nil
}

func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName string, tableName string, shardID storage.ShardID) error {
schema, exists := c.tableManager.GetSchema(schemaName)
if !exists {
return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName)
}
return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName, shardID)
}

func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context, schemaName string, tableName string) error {
schema, exists := c.tableManager.GetSchema(schemaName)
if !exists {
return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName)
}
return c.topologyManager.DeleteTableAssignedShard(ctx, schema.ID, tableName)
}

func (c *ClusterMetadata) GetShards() []storage.ShardID {
return c.topologyManager.GetShards()
}
Expand Down
95 changes: 93 additions & 2 deletions server/cluster/metadata/topology_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// TopologyManager manages the cluster topology, including the mapping relationship between shards, nodes, and tables.
type TopologyManager interface {
// Load load cluster topology from storage.
Load(ctx context.Context) error
Load(ctx context.Context, schemas []storage.Schema) error
// GetVersion get cluster view version.
GetVersion() uint64
// GetClusterState get cluster view state.
Expand All @@ -44,6 +44,14 @@ type TopologyManager interface {
AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error
// RemoveTable remove table on target shards from cluster topology.
RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error
// GetTableShardID get the shardID of the shard where the table is located.
GetTableShardID(ctx context.Context, table storage.Table) (storage.ShardID, bool)
// AssignTableToShard persistent table shard mapping, it is used to store assign results and make the table creation idempotent.
AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error
// GetTableAssignedShard get table assign result.
GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool)
// DeleteTableAssignedShard delete table assign result.
DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error
// GetShards get all shards in cluster topology.
GetShards() []storage.ShardID
// GetShardNodesByID get shardNodes with shardID.
Expand Down Expand Up @@ -133,6 +141,8 @@ type TopologyManagerImpl struct {
// ShardView in memory.
shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> shardTopology
tableShardMapping map[storage.TableID][]storage.ShardID // tableID -> ShardID
// Table assign result in memory.
tableAssignMapping map[storage.SchemaID]map[string]storage.ShardID // tableName -> shardID

nodes map[string]storage.Node // NodeName in memory.
}
Expand All @@ -150,11 +160,12 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage storage.Storage, cluster
nodeShardsMapping: nil,
shardTablesMapping: nil,
tableShardMapping: nil,
tableAssignMapping: nil,
nodes: nil,
}
}

func (m *TopologyManagerImpl) Load(ctx context.Context) error {
func (m *TopologyManagerImpl) Load(ctx context.Context, schemas []storage.Schema) error {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -169,6 +180,11 @@ func (m *TopologyManagerImpl) Load(ctx context.Context) error {
if err := m.loadNodes(ctx); err != nil {
return errors.WithMessage(err, "load nodes")
}

if err := m.loadAssignTable(ctx, schemas); err != nil {
return errors.WithMessage(err, "load assign table")
}

return nil
}

Expand Down Expand Up @@ -294,6 +310,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S
return nil
}

func (m *TopologyManagerImpl) GetTableShardID(_ context.Context, table storage.Table) (storage.ShardID, bool) {
m.lock.RLock()
defer m.lock.RUnlock()

shardIDs, exists := m.tableShardMapping[table.ID]
if exists {
return shardIDs[0], true
}

return 0, false
}

func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error {
m.lock.Lock()
defer m.lock.Unlock()

if err := m.storage.AssignTableToShard(ctx, storage.AssignTableToShardRequest{
ClusterID: m.clusterID,
SchemaID: schemaID,
TableName: tableName,
ShardID: shardID,
}); err != nil {
return errors.WithMessage(err, "storage assign table")
}

// Update cache im memory.
if _, exists := m.tableAssignMapping[schemaID]; !exists {
m.tableAssignMapping[schemaID] = make(map[string]storage.ShardID, 0)
}

m.tableAssignMapping[schemaID][tableName] = shardID

return nil
}

func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) {
assignResult, exists := m.tableAssignMapping[schemaID][tableName]
return assignResult, exists
}

func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error {
m.lock.Lock()
defer m.lock.Unlock()

if err := m.storage.DeleteTableAssignedShard(ctx, storage.DeleteTableAssignedRequest{
ClusterID: m.clusterID,
SchemaID: schemaID,
TableName: tableName,
}); err != nil {
return errors.WithMessage(err, "storage delete assign table")
}

// Update cache im memory.
delete(m.tableAssignMapping[schemaID], tableName)

return nil
}

func (m *TopologyManagerImpl) GetShards() []storage.ShardID {
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down Expand Up @@ -584,6 +658,23 @@ func (m *TopologyManagerImpl) loadShardViews(ctx context.Context) error {
return nil
}

func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas []storage.Schema) error {
m.tableAssignMapping = make(map[storage.SchemaID]map[string]storage.ShardID, len(schemas))
for _, schema := range schemas {
m.tableAssignMapping[schema.ID] = make(map[string]storage.ShardID, 0)

listAssignTableResult, err := m.storage.ListTableAssignedShard(ctx, storage.ListAssignTableRequest{ClusterID: m.clusterID, SchemaID: schema.ID})
if err != nil {
return errors.WithMessage(err, "storage list assign table")
}
for _, assignTable := range listAssignTableResult.TableAssigns {
m.tableAssignMapping[schema.ID][assignTable.TableName] = assignTable.ShardID
}
}

return nil
}

func (m *TopologyManagerImpl) loadNodes(ctx context.Context) error {
nodesResult, err := m.storage.ListNodes(ctx, storage.ListNodesRequest{ClusterID: m.clusterID})
if err != nil {
Expand Down
32 changes: 21 additions & 11 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Factory struct {
idAllocator id.Allocator
dispatch eventdispatch.Dispatch
storage procedure.Storage
shardPicker ShardPicker
shardPicker *PersistShardPicker
}

type CreateTableRequest struct {
Expand Down Expand Up @@ -101,13 +101,13 @@ type BatchRequest struct {
BatchType procedure.Kind
}

func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage, clusterMetadata *metadata.ClusterMetadata) *Factory {
return &Factory{
idAllocator: allocator,
dispatch: dispatch,
storage: storage,
logger: logger,
shardPicker: NewLeastTableShardPicker(),
shardPicker: NewPersistShardPicker(clusterMetadata, NewLeastTableShardPicker()),
}
}

Expand All @@ -129,22 +129,32 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa
}
snapshot := request.ClusterMetadata.GetClusterSnapshot()

shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
var targetShardID storage.ShardID
shardID, exists, err := request.ClusterMetadata.GetTableAssignedShard(ctx, request.SourceReq.SchemaName, request.SourceReq.Name)
if err != nil {
f.logger.Error("pick table shard", zap.Error(err))
return nil, errors.WithMessage(err, "pick table shard")
return nil, err
}
if len(shards) != 1 {
f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards)))
return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards))
if exists {
targetShardID = shardID
} else {
shards, err := f.shardPicker.PickShards(ctx, snapshot, request.SourceReq.GetSchemaName(), []string{request.SourceReq.GetName()})
if err != nil {
f.logger.Error("pick table shard", zap.Error(err))
return nil, errors.WithMessage(err, "pick table shard")
}
if len(shards) != 1 {
f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards)))
return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards))
}
targetShardID = shards[request.SourceReq.GetName()].ID
}

return createtable.NewProcedure(createtable.ProcedureParams{
Dispatch: f.dispatch,
ClusterMetadata: request.ClusterMetadata,
ClusterSnapshot: snapshot,
ID: id,
ShardID: shards[0].ID,
ShardID: targetShardID,
SourceReq: request.SourceReq,
OnSucceeded: request.OnSucceeded,
OnFailed: request.OnFailed,
Expand All @@ -164,7 +174,7 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request
nodeNames[shardNode.NodeName] = 1
}

subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, len(request.SourceReq.PartitionTableInfo.SubTableNames))
subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, request.SourceReq.GetSchemaName(), request.SourceReq.PartitionTableInfo.SubTableNames)
if err != nil {
return nil, errors.WithMessage(err, "pick sub table shards")
}
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func setupFactory(t *testing.T) (*coordinator.Factory, *metadata.ClusterMetadata
dispatch := test.MockDispatch{}
allocator := test.MockIDAllocator{}
storage := test.NewTestStorage(t)
f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage)
f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage, c.GetMetadata())

return f, c.GetMetadata()
}
Expand Down
86 changes: 86 additions & 0 deletions server/coordinator/persist_shard_picker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package coordinator

import (
"context"

"github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
"github.com/apache/incubator-horaedb-meta/server/storage"
)

type PersistShardPicker struct {
cluster *metadata.ClusterMetadata
internal ShardPicker
}

func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal ShardPicker) *PersistShardPicker {
return &PersistShardPicker{cluster: cluster, internal: internal}
}

func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.Snapshot, schemaName string, tableNames []string) (map[string]storage.ShardNode, error) {
result := map[string]storage.ShardNode{}

shardNodeMap := map[storage.ShardID]storage.ShardNode{}
ZuLiangWang marked this conversation as resolved.
Show resolved Hide resolved
for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes {
shardNodeMap[shardNode.ID] = shardNode
}

var missingTables []string
// If table assign has been created, just reuse it.
for i := 0; i < len(tableNames); i++ {
shardID, exists, err := p.cluster.GetTableAssignedShard(ctx, schemaName, tableNames[i])
if err != nil {
return map[string]storage.ShardNode{}, err
}
if exists {
result[tableNames[i]] = shardNodeMap[shardID]
} else {
missingTables = append(missingTables, tableNames[i])
}
}

if len(result) == len(tableNames) {
ZuLiangWang marked this conversation as resolved.
Show resolved Hide resolved
return result, nil
}

var tablesNeedToAssignShard []string
if len(missingTables) > 0 {
tablesNeedToAssignShard = missingTables
} else {
tablesNeedToAssignShard = tableNames
ZuLiangWang marked this conversation as resolved.
Show resolved Hide resolved
}

// No table assign has been created, try to pick shard and save table assigns.
shardNodes, err := p.internal.PickShards(ctx, snapshot, len(tablesNeedToAssignShard))
if err != nil {
return map[string]storage.ShardNode{}, err
}

for i, shardNode := range shardNodes {
result[tablesNeedToAssignShard[i]] = shardNode
err = p.cluster.AssignTableToShard(ctx, schemaName, tablesNeedToAssignShard[i], shardNode.ID)
if err != nil {
return map[string]storage.ShardNode{}, err
}
}

return result, nil
}
Loading
Loading