Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
fix drop partition table
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Sep 15, 2023
1 parent bb34836 commit f4f307e
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 19 deletions.
4 changes: 2 additions & 2 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request Creat
}

if exists {
return CreateTableMetadataResult{}, ErrTableAlreadyExists
return CreateTableMetadataResult{}, errors.WithMessagef(ErrTableAlreadyExists, "table name %s", request.TableName)
}

// Create table in table manager.
Expand Down Expand Up @@ -311,7 +311,7 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe
}

if exists {
return CreateTableResult{}, ErrTableAlreadyExists
return CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "table name:%s", request.TableName)
}

// Create table in table manager.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/metadata/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (m *TableManagerImpl) CreateTable(ctx context.Context, schemaName string, t
}

if exists {
return storage.Table{}, ErrTableAlreadyExists
return storage.Table{}, errors.WithMessagef(ErrTableAlreadyExists, "table name:%s ", tableName)
}

// Create table in storage.
Expand Down
17 changes: 10 additions & 7 deletions server/coordinator/procedure/ddl/common_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,21 @@ func BuildCreateTableRequest(table storage.Table, shardVersionUpdate metadata.Sh
}
}

func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string, shardVersions map[storage.ShardID]uint64) (storage.Table, metadata.ShardVersionUpdate, error) {
func GetTableMetadata(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string) (storage.Table, error) {
table, exists, err := clusterMetadata.GetTable(schemaName, tableName)
if err != nil {
return storage.Table{}, metadata.ShardVersionUpdate{}, err
return storage.Table{}, err
}
if !exists {
return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrTableNotExists, "table not exists")
return storage.Table{}, errors.WithMessage(procedure.ErrTableNotExists, "table not exists")
}
return table, nil
}

func GetTableShardVersion(table storage.Table, clusterMetadata *metadata.ClusterMetadata, shardVersions map[storage.ShardID]uint64) (metadata.ShardVersionUpdate, error) {
shardNodesResult, err := clusterMetadata.GetShardNodeByTableIDs([]storage.TableID{table.ID})
if err != nil {
return storage.Table{}, metadata.ShardVersionUpdate{}, err
return metadata.ShardVersionUpdate{}, err
}

leader := storage.ShardNode{}
Expand All @@ -92,17 +95,17 @@ func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schem
}

if !found {
return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrShardLeaderNotFound, "can't find leader")
return metadata.ShardVersionUpdate{}, errors.WithMessagef(procedure.ErrShardLeaderNotFound, "can't find leader, shardID: %d", shardNodesResult.ShardNodes[table.ID])
}

prevVersion, exists := shardVersions[leader.ID]
if !exists {
return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
return metadata.ShardVersionUpdate{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
}

currVersion := prevVersion + 1

return table, metadata.ShardVersionUpdate{
return metadata.ShardVersionUpdate{
ShardID: leader.ID,
CurrVersion: currVersion,
PrevVersion: prevVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "create partition table")
}
case stateCreatePartitionTable:
Expand All @@ -147,6 +148,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "create data tables")
}
case stateCreateSubTables:
Expand All @@ -155,12 +157,14 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "update table shard metadata")
}
case stateFinish:
// TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine.
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "create partition table procedure persist")
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func buildRelatedVersionInfo(params ProcedureParams) (procedure.RelatedVersionIn
return procedure.RelatedVersionInfo{}, errors.WithMessagef(err, "get sub table, tableName:%s", subTableName)
}
if !exists {
return procedure.RelatedVersionInfo{}, errors.WithMessagef(procedure.ErrTableNotExists, "get sub table, tableName:%s", subTableName)
continue
}
shardID, exists := tableShardMapping[table.ID]
if !exists {
return procedure.RelatedVersionInfo{}, errors.WithMessagef(metadata.ErrShardNotFound, "get shard of sub table, tableID:%d", table.ID)
continue
}
shardView, exists := params.ClusterSnapshot.Topology.ShardViewsMapping[shardID]
if !exists {
Expand Down Expand Up @@ -154,6 +154,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure")
}
case stateDropDataTable:
Expand All @@ -162,6 +163,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure drop data table")
}
case stateDropPartitionTable:
Expand All @@ -170,11 +172,13 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventFinish, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure drop partition table")
}
case stateFinish:
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure persist")
}
return nil
Expand Down Expand Up @@ -278,10 +282,23 @@ func dropDataTablesCallback(event *fsm.Event) {

shardVersions := req.p.relatedVersionInfo.ShardWithVersion
for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames {
table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, req.schemaName(), tableName, shardVersions)
table, err := ddl.GetTableMetadata(params.ClusterMetadata, req.schemaName(), tableName)
if err != nil {
log.Warn("get table metadata failed", zap.String("tableName", tableName))
continue
}

shardVersionUpdate, err := ddl.GetTableShardVersion(table, params.ClusterMetadata, shardVersions)
if err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("get shard version by table, table:%s", tableName))
return
_, err := params.ClusterMetadata.DropTableMetadata(req.ctx, req.schemaName(), tableName)
if err != nil {
procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", tableName))
return
}
log.Warn("get shard version by table", zap.String("tableName", tableName), zap.Error(err))
continue
}

if err := ddl.DispatchDropTable(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate); err != nil {
Expand Down
18 changes: 15 additions & 3 deletions server/coordinator/procedure/ddl/droptable/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,21 @@ func prepareCallback(event *fsm.Event) {
}
params := req.p.params

table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName(), req.p.relatedVersionInfo.ShardWithVersion)
table, err := ddl.GetTableMetadata(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()))
procedure.CancelEventWithLog(event, err, "get table metadata", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err))
return
}

shardVersionUpdate, err := ddl.GetTableShardVersion(table, params.ClusterMetadata, req.p.relatedVersionInfo.ShardWithVersion)
if err != nil {
_, err = params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", params.SourceReq.GetName()))
return
}
log.Error("get shard version by table", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err))
procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err))
return
}

Expand Down Expand Up @@ -170,7 +182,7 @@ func validateTable(params ProcedureParams) (storage.ShardID, error) {
}
if !exists {
log.Error("drop non-existing table", zap.String("schema", params.SourceReq.GetSchemaName()), zap.String("table", params.SourceReq.GetName()))
return 0, err
return 0, errors.WithMessagef(metadata.ErrTableNotFound, "The table to drop doesn't exists, schema:%s, table:%s", params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
}

for _, shardView := range params.ClusterSnapshot.Topology.ShardViewsMapping {
Expand Down
5 changes: 3 additions & 2 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ func (m *ManagerImpl) startProcedurePromoteInternal(ctx context.Context, procedu

func (m *ManagerImpl) startProcedureWorker(ctx context.Context, newProcedure Procedure, procedureWorkerChan chan struct{}) {
go func() {
start := time.Now()
m.logger.Info("procedure start", zap.Uint64("procedureID", newProcedure.ID()))
err := newProcedure.Start(ctx)
if err != nil {
m.logger.Error("procedure start failed", zap.Error(err))
m.logger.Error("procedure start failed", zap.Error(err), zap.Int64("costTime", time.Since(start).Microseconds()))
}
m.logger.Info("procedure finish", zap.Uint64("procedureID", newProcedure.ID()))
m.logger.Info("procedure finish", zap.Uint64("procedureID", newProcedure.ID()), zap.Int64("costTime", time.Since(start).Microseconds()))
for shardID := range newProcedure.RelatedVersionInfo().ShardWithVersion {
m.lock.Lock()
delete(m.runningProcedures, shardID)
Expand Down
8 changes: 7 additions & 1 deletion server/service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (s *Service) GetTablesOfShards(ctx context.Context, req *metaservicepb.GetT

// CreateTable implements gRPC CeresmetaServer.
func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTableRequest) (*metaservicepb.CreateTableResponse, error) {
start := time.Now()
// Since there may be too many table creation requests, a flow limiter is added here.
if ok, err := s.allow(); !ok {
return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table grpc request is rejected by flow limiter")}, nil
Expand Down Expand Up @@ -201,6 +202,7 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl

select {
case ret := <-resultCh:
log.Info("create table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.CreateTableResponse{
Header: okResponseHeader(),
CreatedTable: &metaservicepb.TableInfo{
Expand All @@ -216,12 +218,14 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl
},
}, nil
case err = <-errorCh:
log.Info("create table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil
}
}

// DropTable implements gRPC CeresmetaServer.
func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableRequest) (*metaservicepb.DropTableResponse, error) {
start := time.Now()
// Since there may be too many table dropping requests, a flow limiter is added here.
if ok, err := s.allow(); !ok {
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table grpc request is rejected by flow limiter")}, nil
Expand Down Expand Up @@ -270,17 +274,19 @@ func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableReq
}
err = c.GetProcedureManager().Submit(ctx, procedure)
if err != nil {
log.Error("fail to drop table, manager submit procedure", zap.Error(err))
log.Error("fail to drop table, manager submit procedure", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}

select {
case ret := <-resultCh:
log.Info("drop table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{
Header: okResponseHeader(),
DroppedTable: metadata.ConvertTableInfoToPB(ret),
}, nil
case err = <-errorCh:
log.Info("drop table err", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}
}
Expand Down

0 comments on commit f4f307e

Please sign in to comment.