diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6af9b2a9..95136177 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -58,7 +58,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client * dispatch := eventdispatch.NewDispatchImpl() procedureIDRootPath := strings.Join([]string{rootPath, metadata.Name(), defaultProcedurePrefixKey}, "/") - procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), dispatch, procedureStorage) + procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), dispatch, procedureStorage, metadata) schedulerManager := manager.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize()) diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index ab9b293a..0ae03b6a 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -115,7 +115,8 @@ func (c *ClusterMetadata) Load(ctx context.Context) error { return errors.WithMessage(err, "load table manager") } - if err := c.topologyManager.Load(ctx); err != nil { + schemas := c.tableManager.GetSchemas() + if err := c.topologyManager.Load(ctx, schemas); err != nil { return errors.WithMessage(err, "load topology manager") } @@ -279,6 +280,11 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName string) (storage.Table, return c.tableManager.GetTable(schemaName, tableName) } +// GetTableShard get the shard where the table actually exists. +func (c *ClusterMetadata) GetTableShard(ctx context.Context, table storage.Table) (storage.ShardID, bool) { + return c.topologyManager.GetTableShardID(ctx, table) +} + func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request CreateTableMetadataRequest) (CreateTableMetadataResult, error) { c.logger.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName)) @@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe return ret, nil } +func (c *ClusterMetadata) GetTableAssignedShard(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool, error) { + schema, exists := c.tableManager.GetSchema(schemaName) + if !exists { + return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) + } + shardIDs, exists := c.topologyManager.GetTableAssignedShard(ctx, schema.ID, tableName) + return shardIDs, exists, nil +} + +func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName string, tableName string, shardID storage.ShardID) error { + schema, exists := c.tableManager.GetSchema(schemaName) + if !exists { + return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) + } + return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName, shardID) +} + +func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context, schemaName string, tableName string) error { + schema, exists := c.tableManager.GetSchema(schemaName) + if !exists { + return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) + } + return c.topologyManager.DeleteTableAssignedShard(ctx, schema.ID, tableName) +} + func (c *ClusterMetadata) GetShards() []storage.ShardID { return c.topologyManager.GetShards() } diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index 2426bb0b..f744a5d9 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -33,7 +33,7 @@ import ( // TopologyManager manages the cluster topology, including the mapping relationship between shards, nodes, and tables. type TopologyManager interface { // Load load cluster topology from storage. - Load(ctx context.Context) error + Load(ctx context.Context, schemas []storage.Schema) error // GetVersion get cluster view version. GetVersion() uint64 // GetClusterState get cluster view state. @@ -44,6 +44,14 @@ type TopologyManager interface { AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error // RemoveTable remove table on target shards from cluster topology. RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error + // GetTableShardID get the shardID of the shard where the table is located. + GetTableShardID(ctx context.Context, table storage.Table) (storage.ShardID, bool) + // AssignTableToShard persistent table shard mapping, it is used to store assign results and make the table creation idempotent. + AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error + // GetTableAssignedShard get table assign result. + GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) + // DeleteTableAssignedShard delete table assign result. + DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error // GetShards get all shards in cluster topology. GetShards() []storage.ShardID // GetShardNodesByID get shardNodes with shardID. @@ -133,6 +141,8 @@ type TopologyManagerImpl struct { // ShardView in memory. shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> shardTopology tableShardMapping map[storage.TableID][]storage.ShardID // tableID -> ShardID + // Table assign result in memory. + tableAssignMapping map[storage.SchemaID]map[string]storage.ShardID // tableName -> shardID nodes map[string]storage.Node // NodeName in memory. } @@ -150,11 +160,12 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage storage.Storage, cluster nodeShardsMapping: nil, shardTablesMapping: nil, tableShardMapping: nil, + tableAssignMapping: nil, nodes: nil, } } -func (m *TopologyManagerImpl) Load(ctx context.Context) error { +func (m *TopologyManagerImpl) Load(ctx context.Context, schemas []storage.Schema) error { m.lock.Lock() defer m.lock.Unlock() @@ -169,6 +180,11 @@ func (m *TopologyManagerImpl) Load(ctx context.Context) error { if err := m.loadNodes(ctx); err != nil { return errors.WithMessage(err, "load nodes") } + + if err := m.loadAssignTable(ctx, schemas); err != nil { + return errors.WithMessage(err, "load assign table") + } + return nil } @@ -294,6 +310,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S return nil } +func (m *TopologyManagerImpl) GetTableShardID(_ context.Context, table storage.Table) (storage.ShardID, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + + shardIDs, exists := m.tableShardMapping[table.ID] + if exists { + return shardIDs[0], true + } + + return 0, false +} + +func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { + m.lock.Lock() + defer m.lock.Unlock() + + if err := m.storage.AssignTableToShard(ctx, storage.AssignTableToShardRequest{ + ClusterID: m.clusterID, + SchemaID: schemaID, + TableName: tableName, + ShardID: shardID, + }); err != nil { + return errors.WithMessage(err, "storage assign table") + } + + // Update cache im memory. + if _, exists := m.tableAssignMapping[schemaID]; !exists { + m.tableAssignMapping[schemaID] = make(map[string]storage.ShardID, 0) + } + + m.tableAssignMapping[schemaID][tableName] = shardID + + return nil +} + +func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { + assignResult, exists := m.tableAssignMapping[schemaID][tableName] + return assignResult, exists +} + +func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error { + m.lock.Lock() + defer m.lock.Unlock() + + if err := m.storage.DeleteTableAssignedShard(ctx, storage.DeleteTableAssignedRequest{ + ClusterID: m.clusterID, + SchemaID: schemaID, + TableName: tableName, + }); err != nil { + return errors.WithMessage(err, "storage delete assign table") + } + + // Update cache im memory. + delete(m.tableAssignMapping[schemaID], tableName) + + return nil +} + func (m *TopologyManagerImpl) GetShards() []storage.ShardID { m.lock.RLock() defer m.lock.RUnlock() @@ -584,6 +658,23 @@ func (m *TopologyManagerImpl) loadShardViews(ctx context.Context) error { return nil } +func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas []storage.Schema) error { + m.tableAssignMapping = make(map[storage.SchemaID]map[string]storage.ShardID, len(schemas)) + for _, schema := range schemas { + m.tableAssignMapping[schema.ID] = make(map[string]storage.ShardID, 0) + + listAssignTableResult, err := m.storage.ListTableAssignedShard(ctx, storage.ListAssignTableRequest{ClusterID: m.clusterID, SchemaID: schema.ID}) + if err != nil { + return errors.WithMessage(err, "storage list assign table") + } + for _, assignTable := range listAssignTableResult.TableAssigns { + m.tableAssignMapping[schema.ID][assignTable.TableName] = assignTable.ShardID + } + } + + return nil +} + func (m *TopologyManagerImpl) loadNodes(ctx context.Context) error { nodesResult, err := m.storage.ListNodes(ctx, storage.ListNodesRequest{ClusterID: m.clusterID}) if err != nil { diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index a830fe3b..9e12c7f1 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -43,7 +43,7 @@ type Factory struct { idAllocator id.Allocator dispatch eventdispatch.Dispatch storage procedure.Storage - shardPicker ShardPicker + shardPicker *PersistShardPicker } type CreateTableRequest struct { @@ -101,13 +101,13 @@ type BatchRequest struct { BatchType procedure.Kind } -func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory { +func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage, clusterMetadata *metadata.ClusterMetadata) *Factory { return &Factory{ idAllocator: allocator, dispatch: dispatch, storage: storage, logger: logger, - shardPicker: NewLeastTableShardPicker(), + shardPicker: NewPersistShardPicker(clusterMetadata, NewLeastTableShardPicker()), } } @@ -129,14 +129,24 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa } snapshot := request.ClusterMetadata.GetClusterSnapshot() - shards, err := f.shardPicker.PickShards(ctx, snapshot, 1) + var targetShardID storage.ShardID + shardID, exists, err := request.ClusterMetadata.GetTableAssignedShard(ctx, request.SourceReq.SchemaName, request.SourceReq.Name) if err != nil { - f.logger.Error("pick table shard", zap.Error(err)) - return nil, errors.WithMessage(err, "pick table shard") + return nil, err } - if len(shards) != 1 { - f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards))) - return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards)) + if exists { + targetShardID = shardID + } else { + shards, err := f.shardPicker.PickShards(ctx, snapshot, request.SourceReq.GetSchemaName(), []string{request.SourceReq.GetName()}) + if err != nil { + f.logger.Error("pick table shard", zap.Error(err)) + return nil, errors.WithMessage(err, "pick table shard") + } + if len(shards) != 1 { + f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards))) + return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards)) + } + targetShardID = shards[request.SourceReq.GetName()].ID } return createtable.NewProcedure(createtable.ProcedureParams{ @@ -144,7 +154,7 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa ClusterMetadata: request.ClusterMetadata, ClusterSnapshot: snapshot, ID: id, - ShardID: shards[0].ID, + ShardID: targetShardID, SourceReq: request.SourceReq, OnSucceeded: request.OnSucceeded, OnFailed: request.OnFailed, @@ -164,7 +174,7 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request nodeNames[shardNode.NodeName] = 1 } - subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, len(request.SourceReq.PartitionTableInfo.SubTableNames)) + subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, request.SourceReq.GetSchemaName(), request.SourceReq.PartitionTableInfo.SubTableNames) if err != nil { return nil, errors.WithMessage(err, "pick sub table shards") } diff --git a/server/coordinator/factory_test.go b/server/coordinator/factory_test.go index 336255a1..04c83c67 100644 --- a/server/coordinator/factory_test.go +++ b/server/coordinator/factory_test.go @@ -39,7 +39,7 @@ func setupFactory(t *testing.T) (*coordinator.Factory, *metadata.ClusterMetadata dispatch := test.MockDispatch{} allocator := test.MockIDAllocator{} storage := test.NewTestStorage(t) - f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage) + f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage, c.GetMetadata()) return f, c.GetMetadata() } diff --git a/server/coordinator/persist_shard_picker.go b/server/coordinator/persist_shard_picker.go new file mode 100644 index 00000000..b3684221 --- /dev/null +++ b/server/coordinator/persist_shard_picker.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package coordinator + +import ( + "context" + + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" + "github.com/apache/incubator-horaedb-meta/server/storage" +) + +type PersistShardPicker struct { + cluster *metadata.ClusterMetadata + internal ShardPicker +} + +func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal ShardPicker) *PersistShardPicker { + return &PersistShardPicker{cluster: cluster, internal: internal} +} + +func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.Snapshot, schemaName string, tableNames []string) (map[string]storage.ShardNode, error) { + result := map[string]storage.ShardNode{} + + shardNodeMap := make(map[storage.ShardID]storage.ShardNode, len(tableNames)) + for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes { + shardNodeMap[shardNode.ID] = shardNode + } + + var missingTables []string + // If table assign has been created, just reuse it. + for i := 0; i < len(tableNames); i++ { + shardID, exists, err := p.cluster.GetTableAssignedShard(ctx, schemaName, tableNames[i]) + if err != nil { + return map[string]storage.ShardNode{}, err + } + if exists { + result[tableNames[i]] = shardNodeMap[shardID] + } else { + missingTables = append(missingTables, tableNames[i]) + } + } + + // All table has been assigned to shard. + if len(missingTables) == 0 { + return result, nil + } + + // No table assign has been created, try to pick shard and save table assigns. + shardNodes, err := p.internal.PickShards(ctx, snapshot, len(missingTables)) + if err != nil { + return map[string]storage.ShardNode{}, err + } + + for i, shardNode := range shardNodes { + result[missingTables[i]] = shardNode + err = p.cluster.AssignTableToShard(ctx, schemaName, missingTables[i], shardNode.ID) + if err != nil { + return map[string]storage.ShardNode{}, err + } + } + + return result, nil +} diff --git a/server/coordinator/persist_shard_picker_test.go b/server/coordinator/persist_shard_picker_test.go new file mode 100644 index 00000000..67e3aa7e --- /dev/null +++ b/server/coordinator/persist_shard_picker_test.go @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package coordinator_test + +import ( + "context" + "testing" + + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" + "github.com/apache/incubator-horaedb-meta/server/coordinator" + "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" + "github.com/apache/incubator-horaedb-meta/server/storage" + "github.com/stretchr/testify/require" +) + +func TestPersistShardPicker(t *testing.T) { + re := require.New(t) + ctx := context.Background() + + c := test.InitStableCluster(ctx, t) + + persistShardPicker := coordinator.NewPersistShardPicker(c.GetMetadata(), coordinator.NewLeastTableShardPicker()) + pickResult, err := persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName0}) + re.NoError(err) + re.Equal(len(pickResult), 1) + + createResult, err := c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{ + ShardID: pickResult[test.TestTableName0].ID, + LatestVersion: 0, + SchemaName: test.TestSchemaName, + TableName: test.TestTableName0, + PartitionInfo: storage.PartitionInfo{Info: nil}, + }) + re.NoError(err) + re.Equal(test.TestTableName0, createResult.Table.Name) + + // Try to pick shard for same table after the table is created. + newPickResult, err := persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName0}) + re.NoError(err) + re.Equal(len(newPickResult), 1) + re.Equal(newPickResult[test.TestTableName0], pickResult[test.TestTableName0]) + + // Try to pick shard for another table. + pickResult, err = persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName1}) + re.NoError(err) + re.Equal(len(pickResult), 1) + + err = c.GetMetadata().DropTable(ctx, metadata.DropTableRequest{ + SchemaName: test.TestSchemaName, + TableName: test.TestTableName0, + ShardID: pickResult[test.TestTableName0].ID, + LatestVersion: 0, + }) + re.NoError(err) + + // Try to pick shard for table1 after drop table0. + newPickResult, err = persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName1}) + re.NoError(err) + re.Equal(len(pickResult), 1) + re.Equal(newPickResult[test.TestTableName1], pickResult[test.TestTableName1]) + + err = c.GetMetadata().DeleteTableAssignedShard(ctx, test.TestSchemaName, test.TestTableName1) + re.NoError(err) + + // Try to pick another for table1 after drop table1 assign result. + newPickResult, err = persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName1}) + re.NoError(err) + re.Equal(len(pickResult), 1) + re.NotEqual(newPickResult[test.TestTableName1], pickResult[test.TestTableName1]) +} diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index 49138a44..292f0900 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -37,30 +37,34 @@ import ( ) const ( - eventCreateMetadata = "EventCreateMetadata" - eventCreateOnShard = "EventCreateOnShard" - eventFinish = "EventFinish" - - stateBegin = "StateBegin" - stateCreateMetadata = "StateCreateMetadata" - stateCreateOnShard = "StateCreateOnShard" - stateFinish = "StateFinish" + eventCheckTableExists = "EventCheckTableExists" + eventCreateMetadata = "EventCreateMetadata" + eventCreateOnShard = "EventCreateOnShard" + eventFinish = "EventFinish" + + stateBegin = "StateBegin" + stateCheckTableExists = "StateCheckTableExists" + stateCreateMetadata = "StateCreateMetadata" + stateCreateOnShard = "StateCreateOnShard" + stateFinish = "StateFinish" ) var ( createTableEvents = fsm.Events{ - {Name: eventCreateMetadata, Src: []string{stateBegin}, Dst: stateCreateMetadata}, + {Name: eventCheckTableExists, Src: []string{stateBegin}, Dst: stateCheckTableExists}, + {Name: eventCreateMetadata, Src: []string{stateCheckTableExists}, Dst: stateCreateMetadata}, {Name: eventCreateOnShard, Src: []string{stateCreateMetadata}, Dst: stateCreateOnShard}, {Name: eventFinish, Src: []string{stateCreateOnShard}, Dst: stateFinish}, } createTableCallbacks = fsm.Callbacks{ - eventCreateMetadata: createMetadataCallback, - eventCreateOnShard: createOnShard, - eventFinish: createFinish, + eventCheckTableExists: checkTableExists, + eventCreateMetadata: createMetadata, + eventCreateOnShard: createOnShard, + eventFinish: createFinish, } ) -func createMetadataCallback(event *fsm.Event) { +func checkTableExists(event *fsm.Event) { req, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { procedure.CancelEventWithLog(event, err, "get request from event") @@ -68,6 +72,42 @@ func createMetadataCallback(event *fsm.Event) { } params := req.p.params + // Check whether the table metadata already exists. + table, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + procedure.CancelEventWithLog(event, err, "get table metadata") + return + } + if !exists { + return + } + + // Check whether the table shard mapping already exists. + _, exists = params.ClusterMetadata.GetTableShard(req.ctx, table) + if exists { + procedure.CancelEventWithLog(event, metadata.ErrTableAlreadyExists, "table shard already exists") + return + } +} + +func createMetadata(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + params := req.p.params + + _, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + procedure.CancelEventWithLog(event, err, "get table metadata") + return + } + if exists { + log.Info("table metadata already exists", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) + return + } + createTableMetadataRequest := metadata.CreateTableMetadataRequest{ SchemaName: params.SourceReq.GetSchemaName(), TableName: params.SourceReq.GetName(), @@ -139,6 +179,11 @@ func createFinish(event *fsm.Event) { procedure.CancelEventWithLog(event, err, "get request from event") return } + params := req.p.params + + if err := req.p.params.ClusterMetadata.DeleteTableAssignedShard(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()); err != nil { + log.Warn("delete assign table failed", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) + } assert.Assert(req.createTableResult != nil) if err := req.p.params.OnSucceeded(*req.createTableResult); err != nil { @@ -229,7 +274,6 @@ func (p *Procedure) Kind() procedure.Kind { func (p *Procedure) Start(ctx context.Context) error { p.updateState(procedure.StateRunning) - // Try to load persist data. req := &callbackRequest{ ctx: ctx, p: p, @@ -239,6 +283,11 @@ func (p *Procedure) Start(ctx context.Context) error { for { switch p.fsm.Current() { case stateBegin: + if err := p.fsm.Event(eventCheckTableExists, req); err != nil { + _ = p.params.OnFailed(err) + return err + } + case stateCheckTableExists: if err := p.fsm.Event(eventCreateMetadata, req); err != nil { _ = p.params.OnFailed(err) return err diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index 165cb2cb..c59dc8fc 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -23,6 +23,7 @@ import "github.com/apache/incubator-horaedb-meta/pkg/coderr" var ( ErrShardLeaderNotFound = coderr.NewCodeError(coderr.Internal, "shard leader not found") + ErrShardNotMatch = coderr.NewCodeError(coderr.Internal, "target shard not match to persis data") ErrProcedureNotFound = coderr.NewCodeError(coderr.Internal, "procedure not found") ErrClusterConfigChanged = coderr.NewCodeError(coderr.Internal, "cluster config changed") ErrTableNotExists = coderr.NewCodeError(coderr.Internal, "table not exists") diff --git a/server/coordinator/scheduler/manager/scheduler_manager_test.go b/server/coordinator/scheduler/manager/scheduler_manager_test.go index 15d2bd6c..7edbf5ec 100644 --- a/server/coordinator/scheduler/manager/scheduler_manager_test.go +++ b/server/coordinator/scheduler/manager/scheduler_manager_test.go @@ -44,7 +44,7 @@ func TestSchedulerManager(t *testing.T) { dispatch := test.MockDispatch{} allocator := test.MockIDAllocator{} s := test.NewTestStorage(t) - f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s) + f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s, c.GetMetadata()) _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) // Create scheduler manager with enableScheduler equal to false. diff --git a/server/coordinator/scheduler/rebalanced/scheduler_test.go b/server/coordinator/scheduler/rebalanced/scheduler_test.go index e451b532..8c931572 100644 --- a/server/coordinator/scheduler/rebalanced/scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced/scheduler_test.go @@ -35,23 +35,25 @@ func TestRebalancedScheduler(t *testing.T) { re := require.New(t) ctx := context.Background() - procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - - s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) - // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) + procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), emptyCluster.GetMetadata()) + s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.Empty(result) // PrepareCluster would be scheduled an empty procedure. prepareCluster := test.InitPrepareCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), prepareCluster.GetMetadata()) + s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) _, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) // StableCluster with all shards assigned would be scheduled a load balance procedure. stableCluster := test.InitStableCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), stableCluster.GetMetadata()) + s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) _, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) } diff --git a/server/coordinator/scheduler/reopen/scheduler_test.go b/server/coordinator/scheduler/reopen/scheduler_test.go index 8060e2df..2b23051a 100644 --- a/server/coordinator/scheduler/reopen/scheduler_test.go +++ b/server/coordinator/scheduler/reopen/scheduler_test.go @@ -35,12 +35,12 @@ import ( func TestReopenShardScheduler(t *testing.T) { re := require.New(t) ctx := context.Background() + emptyCluster := test.InitEmptyCluster(ctx, t) - procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) + procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), emptyCluster.GetMetadata()) s := reopen.NewShardScheduler(procedureFactory, 1) - emptyCluster := test.InitEmptyCluster(ctx, t) // ReopenShardScheduler should not schedule when cluster is not stable. result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) diff --git a/server/coordinator/scheduler/static/scheduler_test.go b/server/coordinator/scheduler/static/scheduler_test.go index 929a2f07..d1c9b83e 100644 --- a/server/coordinator/scheduler/static/scheduler_test.go +++ b/server/coordinator/scheduler/static/scheduler_test.go @@ -35,24 +35,26 @@ func TestStaticTopologyScheduler(t *testing.T) { re := require.New(t) ctx := context.Background() - procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - - s := static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) - // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) + procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), emptyCluster.GetMetadata()) + s := static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), prepareCluster.GetMetadata()) + s = static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule. stableCluster := test.InitStableCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), stableCluster.GetMetadata()) + s = static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.NotEmpty(result) diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go index 5d29d805..62ef0773 100644 --- a/server/etcdutil/util.go +++ b/server/etcdutil/util.go @@ -21,6 +21,7 @@ package etcdutil import ( "context" + "path" "github.com/apache/incubator-horaedb-meta/pkg/log" clientv3 "go.etcd.io/etcd/client/v3" @@ -118,7 +119,7 @@ func Scan(ctx context.Context, client *clientv3.Client, startKey, endKey string, } } - // Check whether the keys is exhausted. + // Check whether the keys are exhausted. if len(resp.Kvs) < batchSize { return nil } @@ -126,3 +127,29 @@ func Scan(ctx context.Context, client *clientv3.Client, startKey, endKey string, lastKeyInPrevBatch = string(resp.Kvs[len(resp.Kvs)-1].Key) } } + +func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix string, do func(key string, val []byte) error) error { + rangeEnd := clientv3.GetPrefixRangeEnd(prefix) + resp, err := client.Get(ctx, prefix, clientv3.WithRange(rangeEnd)) + if err != nil { + return ErrEtcdKVGet.WithCause(err) + } + // Check whether the keys are exhausted. + if len(resp.Kvs) == 0 { + return nil + } + + for _, item := range resp.Kvs { + err := do(string(item.Key), item.Value) + if err != nil { + return err + } + } + + return nil +} + +// GetLastPathSegment get the last path segment from completePath, path is split by '/'. +func GetLastPathSegment(completePath string) string { + return path.Base(path.Clean(completePath)) +} diff --git a/server/etcdutil/util_test.go b/server/etcdutil/util_test.go index fa2532d6..6b816a4a 100644 --- a/server/etcdutil/util_test.go +++ b/server/etcdutil/util_test.go @@ -105,3 +105,42 @@ func TestScanFailed(t *testing.T) { err := Scan(ctx, client, startKey, endKey, 10, do) r.Equal(fakeErr, err) } + +func TestScanWithPrefix(t *testing.T) { + r := require.New(t) + + _, client, closeSrv := PrepareEtcdServerAndClient(t) + defer closeSrv() + ctx := context.Background() + + // Build keys with different prefix. + keys := []string{} + keys = append(keys, "/prefix/0") + keys = append(keys, "/prefix/1") + keys = append(keys, "/diff/0") + + // Put the keys. + for _, key := range keys { + // Let the value equal key for simplicity. + val := key + _, err := client.Put(ctx, key, val) + r.NoError(err) + } + + var scanResult []string + do := func(key string, value []byte) error { + scanResult = append(scanResult, key) + return nil + } + err := ScanWithPrefix(ctx, client, "/prefix", do) + r.NoError(err) + r.Equal(len(scanResult), 2) +} + +func TestGetLastPathSegment(t *testing.T) { + r := require.New(t) + + path := "/prefix/a/b/c" + lastPathSegment := GetLastPathSegment(path) + r.Equal("c", lastPathSegment) +} diff --git a/server/storage/key_path.go b/server/storage/key_path.go index 6036e59f..4627bc0b 100644 --- a/server/storage/key_path.go +++ b/server/storage/key_path.go @@ -36,6 +36,7 @@ const ( shardView = "shard_view" latestVersion = "latest_version" info = "info" + tableAssign = "table_assign" ) // makeSchemaKey returns the key path to the schema meta info. @@ -128,6 +129,19 @@ func makeNameToIDKey(rootPath string, clusterID uint32, schemaID uint32, tableNa return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), schema, fmtID(uint64(schemaID)), tableNameToID, tableName) } +// makeTableAssignKey return the tableAssign key path. +func makeTableAssignKey(rootPath string, clusterID uint32, schemaID uint32, tableName string) string { + // Example: + // v1/cluster/1/schema/1/table_assign/tableName1 -> shardID1 + // v1/cluster/1/schema/1/table_assign/tableName2 -> shardID2 + return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), schema, fmtID(uint64(schemaID)), tableAssign, tableName) +} + +// makeTableAssignPrefixKey return the tableAssign prefix key path. +func makeTableAssignPrefixKey(rootPath string, clusterID uint32, schemaID uint32) string { + return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), schema, fmtID(uint64(schemaID)), tableAssign) +} + func fmtID(id uint64) string { return fmt.Sprintf("%020d", id) } diff --git a/server/storage/meta.go b/server/storage/meta.go index 842995d5..b56da682 100644 --- a/server/storage/meta.go +++ b/server/storage/meta.go @@ -57,6 +57,13 @@ type Storage interface { // DeleteTable delete table by table name in specified cluster and schema. DeleteTable(ctx context.Context, req DeleteTableRequest) error + // AssignTableToShard save table assign result. + AssignTableToShard(ctx context.Context, req AssignTableToShardRequest) error + // DeleteTableAssignedShard delete table assign result. + DeleteTableAssignedShard(ctx context.Context, req DeleteTableAssignedRequest) error + // ListTableAssignedShard list table assign result. + ListTableAssignedShard(ctx context.Context, req ListAssignTableRequest) (ListTableAssignedShardResult, error) + // CreateShardViews create shard views in specified cluster. CreateShardViews(ctx context.Context, req CreateShardViewsRequest) error // ListShardViews list all shard views in specified cluster. diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go index 8aaafdd8..e6b130e8 100644 --- a/server/storage/storage_impl.go +++ b/server/storage/storage_impl.go @@ -418,6 +418,73 @@ func (s *metaStorageImpl) DeleteTable(ctx context.Context, req DeleteTableReques return nil } +func (s *metaStorageImpl) AssignTableToShard(ctx context.Context, req AssignTableToShardRequest) error { + key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID), req.TableName) + + // Check if the key exists, if not,save table assign result; Otherwise, the table assign result already exists and return an error. + keyMissing := clientv3util.KeyMissing(key) + opCreateAssignTable := clientv3.OpPut(key, strconv.Itoa(int(req.ShardID))) + + resp, err := s.client.Txn(ctx). + If(keyMissing). + Then(opCreateAssignTable). + Commit() + if err != nil { + return errors.WithMessagef(err, "create assign table, clusterID:%d, schemaID:%d, key:%s", req.ClusterID, req.ShardID, key) + } + if !resp.Succeeded { + return ErrCreateSchemaAgain.WithCausef("assign table may already exist, clusterID:%d, schemaID:%d, key:%s, resp:%v", req.ClusterID, req.SchemaID, key, resp) + } + + return nil +} + +func (s *metaStorageImpl) DeleteTableAssignedShard(ctx context.Context, req DeleteTableAssignedRequest) error { + key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID), req.TableName) + + keyExists := clientv3util.KeyExists(key) + opDeleteAssignTable := clientv3.OpDelete(key) + + resp, err := s.client.Txn(ctx). + If(keyExists). + Then(opDeleteAssignTable). + Commit() + if err != nil { + return errors.WithMessagef(err, "delete assign table, clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, req.SchemaID, req.TableName) + } + if !resp.Succeeded { + return ErrDeleteTableAgain.WithCausef("assign table may have been deleted, clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, req.SchemaID, req.TableName) + } + + return nil +} + +func (s *metaStorageImpl) ListTableAssignedShard(ctx context.Context, req ListAssignTableRequest) (ListTableAssignedShardResult, error) { + key := makeTableAssignPrefixKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID)) + rangeLimit := s.opts.MaxScanLimit + + var tableAssigns []TableAssign + do := func(key string, value []byte) error { + tableName := etcdutil.GetLastPathSegment(key) + shardIDStr := string(value) + shardID, err := strconv.ParseUint(shardIDStr, 10, 32) + if err != nil { + return err + } + tableAssigns = append(tableAssigns, TableAssign{ + TableName: tableName, + ShardID: ShardID(shardID), + }) + return nil + } + + if err := etcdutil.ScanWithPrefix(ctx, s.client, key, do); err != nil { + return ListTableAssignedShardResult{}, errors.WithMessagef(err, "scan tables, clusterID:%d, schemaID:%d, prefix key:%s, range limit:%d", req.ClusterID, req.SchemaID, key, rangeLimit) + } + + return ListTableAssignedShardResult{TableAssigns: tableAssigns}, nil +} + func (s *metaStorageImpl) createNShardViews(ctx context.Context, clusterID ClusterID, shardViews []ShardView, ifConds []clientv3.Cmp, opCreates []clientv3.Op) error { for _, shardView := range shardViews { shardViewPB := convertShardViewToPB(shardView) diff --git a/server/storage/types.go b/server/storage/types.go index baff5e2e..5711283a 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -141,6 +141,28 @@ type DeleteTableRequest struct { TableName string } +type AssignTableToShardRequest struct { + ClusterID ClusterID + SchemaID SchemaID + TableName string + ShardID ShardID +} + +type DeleteTableAssignedRequest struct { + ClusterID ClusterID + SchemaID SchemaID + TableName string +} + +type ListAssignTableRequest struct { + ClusterID ClusterID + SchemaID SchemaID +} + +type ListTableAssignedShardResult struct { + TableAssigns []TableAssign +} + type CreateShardViewsRequest struct { ClusterID ClusterID ShardViews []ShardView @@ -232,6 +254,11 @@ func (t Table) IsPartitioned() bool { return t.PartitionInfo.Info != nil } +type TableAssign struct { + TableName string + ShardID ShardID +} + type ShardView struct { ShardID ShardID Version uint64