Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
fix: drop partition table (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang authored Sep 20, 2023
1 parent 07a50fc commit b397c74
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 31 deletions.
4 changes: 2 additions & 2 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request Creat
}

if exists {
return CreateTableMetadataResult{}, ErrTableAlreadyExists
return CreateTableMetadataResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName)
}

// Create table in table manager.
Expand Down Expand Up @@ -311,7 +311,7 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe
}

if exists {
return CreateTableResult{}, ErrTableAlreadyExists
return CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName)
}

// Create table in table manager.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/metadata/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (m *TableManagerImpl) CreateTable(ctx context.Context, schemaName string, t
}

if exists {
return storage.Table{}, ErrTableAlreadyExists
return storage.Table{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", tableName)
}

// Create table in storage.
Expand Down
6 changes: 3 additions & 3 deletions server/coordinator/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func TestDropTable(t *testing.T) {
OnFailed: nil,
})
// Drop non-existing partition table.
re.Error(err)
re.False(ok)
re.Nil(p)
re.NoError(err)
re.True(ok)
re.NotNil(p)
}

func TestTransferLeader(t *testing.T) {
Expand Down
24 changes: 16 additions & 8 deletions server/coordinator/procedure/ddl/common_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,25 @@ func BuildCreateTableRequest(table storage.Table, shardVersionUpdate metadata.Sh
}
}

func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string, shardVersions map[storage.ShardID]uint64) (storage.Table, metadata.ShardVersionUpdate, error) {
func GetTableMetadata(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string) (storage.Table, error) {
table, exists, err := clusterMetadata.GetTable(schemaName, tableName)
if err != nil {
return storage.Table{}, metadata.ShardVersionUpdate{}, err
return storage.Table{}, err
}
if !exists {
return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrTableNotExists, "table not exists")
return storage.Table{}, errors.WithMessagef(procedure.ErrTableNotExists, "table not exists, tableName:%s", tableName)
}
return table, nil
}

// BuildShardVersionUpdate builds metadata.ShardVersionUpdate to assist DDL on the shard.
//
// And if no error is thrown, the returned boolean value is used to tell whether this table is allocated to shard.
// In some cases, we need to use this value to determine whether DDL can be executed normally。
func BuildShardVersionUpdate(table storage.Table, clusterMetadata *metadata.ClusterMetadata, shardVersions map[storage.ShardID]uint64) (metadata.ShardVersionUpdate, bool, error) {
shardNodesResult, err := clusterMetadata.GetShardNodeByTableIDs([]storage.TableID{table.ID})
if err != nil {
return storage.Table{}, metadata.ShardVersionUpdate{}, err
return metadata.ShardVersionUpdate{}, false, err
}

leader := storage.ShardNode{}
Expand All @@ -92,21 +99,22 @@ func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schem
}

if !found {
return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrShardLeaderNotFound, "can't find leader")
log.Warn("table can't find leader shard", zap.String("tableName", table.Name))
return metadata.ShardVersionUpdate{}, false, nil
}

prevVersion, exists := shardVersions[leader.ID]
if !exists {
return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
return metadata.ShardVersionUpdate{}, false, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
}

currVersion := prevVersion + 1

return table, metadata.ShardVersionUpdate{
return metadata.ShardVersionUpdate{
ShardID: leader.ID,
CurrVersion: currVersion,
PrevVersion: prevVersion,
}, nil
}, true, nil
}

func DispatchDropTable(ctx context.Context, clusterMetadata *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, schemaName string, table storage.Table, version metadata.ShardVersionUpdate) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "create partition table")
}
case stateCreatePartitionTable:
Expand All @@ -147,6 +148,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "create data tables")
}
case stateCreateSubTables:
Expand All @@ -155,12 +157,14 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "update table shard metadata")
}
case stateFinish:
// TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine.
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "create partition table procedure persist")
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions server/coordinator/procedure/ddl/createtable/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,12 @@ func (p *Procedure) Start(ctx context.Context) error {
if err1 != nil {
err = errors.WithMessagef(err, "send eventFailed, err:%v", err1)
}
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "send eventPrepare")
}

if err := p.fsm.Event(eventSuccess, req); err != nil {
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "send eventSuccess")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func buildRelatedVersionInfo(params ProcedureParams) (procedure.RelatedVersionIn
return procedure.RelatedVersionInfo{}, errors.WithMessagef(err, "get sub table, tableName:%s", subTableName)
}
if !exists {
return procedure.RelatedVersionInfo{}, errors.WithMessagef(procedure.ErrTableNotExists, "get sub table, tableName:%s", subTableName)
continue
}
shardID, exists := tableShardMapping[table.ID]
if !exists {
return procedure.RelatedVersionInfo{}, errors.WithMessagef(metadata.ErrShardNotFound, "get shard of sub table, tableID:%d", table.ID)
continue
}
shardView, exists := params.ClusterSnapshot.Topology.ShardViewsMapping[shardID]
if !exists {
Expand Down Expand Up @@ -154,6 +154,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure")
}
case stateDropDataTable:
Expand All @@ -162,6 +163,7 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure drop data table")
}
case stateDropPartitionTable:
Expand All @@ -170,11 +172,13 @@ func (p *Procedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventFinish, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure drop partition table")
}
case stateFinish:
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
_ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure persist")
}
return nil
Expand Down Expand Up @@ -278,11 +282,28 @@ func dropDataTablesCallback(event *fsm.Event) {

shardVersions := req.p.relatedVersionInfo.ShardWithVersion
for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames {
table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, req.schemaName(), tableName, shardVersions)
table, err := ddl.GetTableMetadata(params.ClusterMetadata, req.schemaName(), tableName)
if err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("get shard version by table, table:%s", tableName))
log.Warn("get table metadata failed", zap.String("tableName", tableName))
continue
}

shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, shardVersions)
if err != nil {
log.Error("get shard version by table", zap.String("tableName", tableName), zap.Error(err))
procedure.CancelEventWithLog(event, err, "build shard version update", zap.String("tableName", tableName))
return
}
// If the shard corresponding to this table does not exist, it means that the actual table creation failed.
// In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
if !shardExists {
_, err := params.ClusterMetadata.DropTableMetadata(req.ctx, req.schemaName(), tableName)
if err != nil {
procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", tableName))
return
}
continue
}

if err := ddl.DispatchDropTable(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate); err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
Expand Down
36 changes: 26 additions & 10 deletions server/coordinator/procedure/ddl/droptable/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,33 @@ func prepareCallback(event *fsm.Event) {
}
params := req.p.params

table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName(), req.p.relatedVersionInfo.ShardWithVersion)
table, err := ddl.GetTableMetadata(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()))
procedure.CancelEventWithLog(event, err, "get table metadata", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err))
return
}
req.ret = metadata.TableInfo{
ID: table.ID,
Name: table.Name,
SchemaID: table.SchemaID,
SchemaName: params.SourceReq.GetSchemaName(),
PartitionInfo: table.PartitionInfo,
}

shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, req.p.relatedVersionInfo.ShardWithVersion)
if err != nil {
log.Error("get shard version by table", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err))
procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err))
return
}
// If the shard corresponding to this table does not exist, it means that the actual table creation failed.
// In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
if !shardExists {
_, err = params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", params.SourceReq.GetName()))
return
}
return
}

Expand All @@ -77,14 +101,6 @@ func prepareCallback(event *fsm.Event) {
procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult is %d", len(result.ShardVersionUpdate)))
return
}

req.ret = metadata.TableInfo{
ID: table.ID,
Name: table.Name,
SchemaID: table.SchemaID,
SchemaName: params.SourceReq.GetSchemaName(),
PartitionInfo: table.PartitionInfo,
}
}

func successCallback(event *fsm.Event) {
Expand Down
6 changes: 4 additions & 2 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,14 @@ func (m *ManagerImpl) startProcedurePromoteInternal(ctx context.Context, procedu

func (m *ManagerImpl) startProcedureWorker(ctx context.Context, newProcedure Procedure, procedureWorkerChan chan struct{}) {
go func() {
start := time.Now()
m.logger.Info("procedure start", zap.Uint64("procedureID", newProcedure.ID()))
err := newProcedure.Start(ctx)
if err != nil {
m.logger.Error("procedure start failed", zap.Error(err))
m.logger.Error("procedure start failed", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
} else {
m.logger.Info("procedure start finish", zap.Uint64("procedureID", newProcedure.ID()), zap.Int64("costTime", time.Since(start).Milliseconds()))
}
m.logger.Info("procedure finish", zap.Uint64("procedureID", newProcedure.ID()))
for shardID := range newProcedure.RelatedVersionInfo().ShardWithVersion {
m.lock.Lock()
delete(m.runningProcedures, shardID)
Expand Down
8 changes: 7 additions & 1 deletion server/service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (s *Service) GetTablesOfShards(ctx context.Context, req *metaservicepb.GetT

// CreateTable implements gRPC CeresmetaServer.
func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTableRequest) (*metaservicepb.CreateTableResponse, error) {
start := time.Now()
// Since there may be too many table creation requests, a flow limiter is added here.
if ok, err := s.allow(); !ok {
return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table grpc request is rejected by flow limiter")}, nil
Expand Down Expand Up @@ -201,6 +202,7 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl

select {
case ret := <-resultCh:
log.Info("create table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.CreateTableResponse{
Header: okResponseHeader(),
CreatedTable: &metaservicepb.TableInfo{
Expand All @@ -216,12 +218,14 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl
},
}, nil
case err = <-errorCh:
log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil
}
}

// DropTable implements gRPC CeresmetaServer.
func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableRequest) (*metaservicepb.DropTableResponse, error) {
start := time.Now()
// Since there may be too many table dropping requests, a flow limiter is added here.
if ok, err := s.allow(); !ok {
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table grpc request is rejected by flow limiter")}, nil
Expand Down Expand Up @@ -275,17 +279,19 @@ func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableReq

err = c.GetProcedureManager().Submit(ctx, procedure)
if err != nil {
log.Error("fail to drop table, manager submit procedure", zap.Error(err))
log.Error("fail to drop table, manager submit procedure", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}

select {
case ret := <-resultCh:
log.Info("drop table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{
Header: okResponseHeader(),
DroppedTable: metadata.ConvertTableInfoToPB(ret),
}, nil
case err = <-errorCh:
log.Info("drop table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}
}
Expand Down

0 comments on commit b397c74

Please sign in to comment.