Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed Jan 11, 2023
1 parent bfa63b8 commit 4caf272
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 33 deletions.
20 changes: 10 additions & 10 deletions server/coordinator/eventdispatch/dispatch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (d *DispatchImpl) OpenShard(ctx context.Context, addr string, request OpenS
Shard: cluster.ConvertShardsInfoToPB(request.Shard),
})
if err != nil {
return errors.WithMessage(err, "open shard")
return errors.WithMessagef(err, "open shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("open shard, err:%s", resp.GetHeader().GetError())
return ErrDispatch.WithCausef("open shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
}
Expand All @@ -50,10 +50,10 @@ func (d *DispatchImpl) CloseShard(ctx context.Context, addr string, request Clos
ShardId: request.ShardID,
})
if err != nil {
return errors.WithMessage(err, "close shard")
return errors.WithMessagef(err, "close shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("close shard, err:%s", resp.GetHeader().GetError())
return ErrDispatch.WithCausef("close shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
}
Expand All @@ -65,10 +65,10 @@ func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, requ
}
resp, err := client.CreateTableOnShard(ctx, convertCreateTableOnShardRequestToPB(request))
if err != nil {
return errors.WithMessage(err, "create table on shard")
return errors.WithMessagef(err, "create table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("create table on shard, err:%s", resp.GetHeader().GetError())
return ErrDispatch.WithCausef("create table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
}
Expand All @@ -80,10 +80,10 @@ func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, reques
}
resp, err := client.DropTableOnShard(ctx, convertDropTableOnShardRequestToPB(request))
if err != nil {
return errors.WithMessage(err, "drop table on shard")
return errors.WithMessagef(err, "drop table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("drop table on shard, err:%s", resp.GetHeader().GetError())
return ErrDispatch.WithCausef("drop table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
}
Expand All @@ -96,10 +96,10 @@ func (d *DispatchImpl) CloseTableOnShard(ctx context.Context, addr string, reque

resp, err := client.CloseTableOnShard(ctx, convertCloseTableOnShardRequestToPB(request))
if err != nil {
return errors.WithMessage(err, "close table on shard")
return errors.WithMessagef(err, "close table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("close table on shard, err:%s", resp.GetHeader().GetError())
return ErrDispatch.WithCausef("close table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestCreateAndDropPartitionTable(t *testing.T) {
subTableNames := genSubTables(tableName, testSubTableNum)
testDropPartitionTable(t, dispatch, c, s, tableName, subTableNames)
}

// Check table not exists.
for i := 0; i < testTableNum; i++ {
tableName := fmt.Sprintf("%s_%d", testTableName0, i)
Expand Down
9 changes: 4 additions & 5 deletions server/coordinator/procedure/create_partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,32 @@ func (p *CreatePartitionTableProcedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure create new shard view")
return errors.WithMessage(err, "create partition table procedure create new shard view")
}
case stateCreatePartitionTable:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventCreateDataTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure create partition table")
return errors.WithMessage(err, "create partition table procedure create partition table")
}
case stateCreateDataTables:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventOpenPartitionTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure create data tables")
return errors.WithMessage(err, "create partition table procedure create data tables")
}
case stateOpenPartitionTables:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure open partition tables")
return errors.WithMessage(err, "create partition table procedure open partition tables")
}
p.updateStateWithLock(StateFinished)
case stateFinish:
// TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine.
p.updateStateWithLock(StateFinished)
Expand Down
49 changes: 31 additions & 18 deletions server/coordinator/procedure/drop_partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const (
eventDropDataTable = "EventDropDataTable"
eventDropPartitionTable = "EventDropPartitionTable"
eventClosePartitionTables = "EventClosePartitionTables"
eventDropPartitionTableFinish = "eventDropPartitionTableFinish"
eventDropPartitionTableFinish = "EventDropPartitionTableFinish"

stateDropPartitionTableBegin = "StateBegin"
stateDropPartitionTableBegin = "StateDropPartitionBegin"
stateDropDataTable = "StateDropDataTable"
stateDropPartitionTable = "StateDropPartitionTable"
stateClosePartitionTables = "StateClosePartitionTables"
Expand Down Expand Up @@ -122,31 +122,31 @@ func (p *DropPartitionTableProcedure) Start(ctx context.Context) error {
}
if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "drop partition table procedure")
return errors.WithMessage(err, "drop partition table procedure")
}
case stateDropDataTable:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "drop partition table procedure persist")
}
if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "drop partition table procedure drop data table")
return errors.WithMessage(err, "drop partition table procedure drop data table")
}
case stateDropPartitionTable:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "drop partition table procedure persist")
}
if err := p.fsm.Event(eventClosePartitionTables, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "drop partition table procedure drop partition table")
return errors.WithMessage(err, "drop partition table procedure drop partition table")
}
case stateClosePartitionTables:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "drop partition table procedure persist")
}
if err := p.fsm.Event(eventDropPartitionTableFinish, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "drop partition table procedure close partition tables")
return errors.WithMessage(err, "drop partition table procedure close partition tables")
}
p.updateStateWithLock(StateFinished)
case stateDropPartitionTableFinish:
Expand Down Expand Up @@ -253,19 +253,28 @@ func dropDataTablesCallback(event *fsm.Event) {
return
}

if len(req.sourceReq.PartitionTableInfo.SubTableNames) == 0 {
cancelEventWithLog(event, ErrEmptyPartitionNames, fmt.Sprintf("drop table, table:%s", req.sourceReq.Name))
return
}

for _, tableName := range req.sourceReq.PartitionTableInfo.SubTableNames {
table, dropTableRet, err := dropTableMetaData(event, tableName)
table, dropTableResult, exists, err := dropTableMetaData(event, tableName)
if err != nil {
cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
return
}

if len(dropTableRet.ShardVersionUpdate) != 1 {
cancelEventWithLog(event, ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableRet.ShardVersionUpdate)))
if !exists {
continue
}

if len(dropTableResult.ShardVersionUpdate) != 1 {
cancelEventWithLog(event, ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableResult.ShardVersionUpdate)))
return
}

if err := dispatchDropTable(event, table, dropTableRet.ShardVersionUpdate[0]); err != nil {
if err := dispatchDropTable(event, table, dropTableResult.ShardVersionUpdate[0]); err != nil {
cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
return
}
Expand All @@ -280,9 +289,13 @@ func dropPartitionTableCallback(event *fsm.Event) {
return
}

table, dropTableRet, err := dropTableMetaData(event, request.tableName())
table, dropTableRet, exists, err := dropTableMetaData(event, request.tableName())
if err != nil {
cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", table.Name))
cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", request.tableName()))
return
}
if !exists {
cancelEventWithLog(event, ErrTableNotExists, fmt.Sprintf("table:%s", request.tableName()))
return
}

Expand Down Expand Up @@ -359,27 +372,27 @@ func finishDropPartitionTableCallback(event *fsm.Event) {
}
}

func dropTableMetaData(event *fsm.Event, tableName string) (storage.Table, cluster.DropTableResult, error) {
func dropTableMetaData(event *fsm.Event, tableName string) (storage.Table, cluster.DropTableResult, bool, error) {
request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event)
if err != nil {
return storage.Table{}, cluster.DropTableResult{}, errors.WithMessage(err, "get request from event")
return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "get request from event")
}

table, exists, err := request.cluster.GetTable(request.schemaName(), tableName)
if err != nil {
return storage.Table{}, cluster.DropTableResult{}, errors.WithMessage(err, "cluster get table")
return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "cluster get table")
}
if !exists {
log.Warn("drop non-existing table", zap.String("schema", request.schemaName()), zap.String("table", tableName))
return storage.Table{}, cluster.DropTableResult{}, ErrTableNotExists
return storage.Table{}, cluster.DropTableResult{}, false, nil
}

result, err := request.cluster.DropTable(request.ctx, request.schemaName(), tableName)
if err != nil {
return storage.Table{}, cluster.DropTableResult{}, errors.WithMessage(err, "cluster drop table")
return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "cluster drop table")
}

return table, result, nil
return table, result, true, nil
}

func dispatchDropTable(event *fsm.Event, table storage.Table, version cluster.ShardVersionUpdate) error {
Expand Down

0 comments on commit 4caf272

Please sign in to comment.