Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor by cr
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Apr 13, 2023
1 parent adeff11 commit 53f264e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package createpartitiontable
import (
"context"
"encoding/json"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"sync"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package createtable

import (
"context"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"sync"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"sync"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
Expand Down Expand Up @@ -224,22 +224,22 @@ func dropDataTablesCallback(event *fsm.Event) {
}

for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames {
table, dropTableResult, exists, err := dropTableMetaData(event, tableName)
dropTableResult, err := dropTableMetaData(event, tableName)
if err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
return
}

if !exists {
if !dropTableResult.exists {
continue
}

if len(dropTableResult.ShardVersionUpdate) != 1 {
procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableResult.ShardVersionUpdate)))
if len(dropTableResult.result.ShardVersionUpdate) != 1 {
procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableResult.result.ShardVersionUpdate)))
return
}

if err := dispatchDropTable(event, table, dropTableResult.ShardVersionUpdate[0]); err != nil {
if err := dispatchDropTable(event, dropTableResult.table, dropTableResult.result.ShardVersionUpdate[0]); err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
return
}
Expand All @@ -254,26 +254,26 @@ func dropPartitionTableCallback(event *fsm.Event) {
return
}

table, dropTableRet, exists, err := dropTableMetaData(event, req.tableName())
dropTableResult, err := dropTableMetaData(event, req.tableName())
if err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", req.tableName()))
return
}
if !exists {
if !dropTableResult.exists {
procedure.CancelEventWithLog(event, procedure.ErrTableNotExists, fmt.Sprintf("table:%s", req.tableName()))
return
}

if len(dropTableRet.ShardVersionUpdate) == 0 {
procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult need >=1, current is %d", len(dropTableRet.ShardVersionUpdate)))
if len(dropTableResult.result.ShardVersionUpdate) == 0 {
procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult need >=1, current is %d", len(dropTableResult.result.ShardVersionUpdate)))
return
}

req.table = table
req.table = dropTableResult.table

// Drop table in the first shard.
if err := dispatchDropTable(event, table, dropTableRet.ShardVersionUpdate[0]); err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", table.Name))
if err := dispatchDropTable(event, dropTableResult.table, dropTableResult.result.ShardVersionUpdate[0]); err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", dropTableResult.table.Name))
return
}
}
Expand All @@ -299,27 +299,44 @@ func finishCallback(event *fsm.Event) {
}
}

func dropTableMetaData(event *fsm.Event, tableName string) (storage.Table, metadata.DropTableResult, bool, error) {
type DropTableMetaDataResult struct {
table storage.Table
result metadata.DropTableResult
exists bool
}

func dropTableMetaData(event *fsm.Event, tableName string) (DropTableMetaDataResult, error) {
request, err := procedure.GetRequestFromEvent[*callbackRequest](event)
if err != nil {
return storage.Table{}, metadata.DropTableResult{}, false, errors.WithMessage(err, "get request from event")
return DropTableMetaDataResult{
table: storage.Table{},
result: metadata.DropTableResult{},
exists: false,
}, errors.WithMessage(err, "get request from event")
}

table, exists, err := request.p.params.ClusterMetadata.GetTable(request.schemaName(), tableName)
if err != nil {
return storage.Table{}, metadata.DropTableResult{}, false, errors.WithMessage(err, "cluster get table")
return DropTableMetaDataResult{
table: storage.Table{},
result: metadata.DropTableResult{},
exists: false,
}, errors.WithMessage(err, "cluster get table")
}
if !exists {
log.Warn("drop non-existing table", zap.String("schema", request.schemaName()), zap.String("table", tableName))
return storage.Table{}, metadata.DropTableResult{}, false, nil
return DropTableMetaDataResult{storage.Table{}, metadata.DropTableResult{}, false}, nil
}

result, err := request.p.params.ClusterMetadata.DropTable(request.ctx, request.schemaName(), tableName)
if err != nil {
return storage.Table{}, metadata.DropTableResult{}, false, errors.WithMessage(err, "cluster drop table")
return DropTableMetaDataResult{storage.Table{}, metadata.DropTableResult{}, false}, errors.WithMessage(err, "cluster drop table")
}

return table, result, true, nil
return DropTableMetaDataResult{
table: table,
result: result,
exists: true,
}, nil
}

func dispatchDropTable(event *fsm.Event, table storage.Table, version metadata.ShardVersionUpdate) error {
Expand Down
4 changes: 2 additions & 2 deletions server/coordinator/procedure/ddl/droptable/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package droptable
import (
"context"
"fmt"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"sync"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
Expand Down Expand Up @@ -215,7 +215,7 @@ func validateTable(params ProcedureParams) (storage.ShardID, error) {
}
}

return 0, errors.WithMessage(metadata.ErrShardNotFound, fmt.Sprintf("The shard corresponding to the table was not found, schemaName:%s,tableName:%s", params.SourceReq.GetSchemaName(), params.SourceReq.GetName()))
return 0, errors.WithMessagef(metadata.ErrShardNotFound, "The shard corresponding to the table was not found, schema:%s, table:%s", params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
}

type Procedure struct {
Expand Down

0 comments on commit 53f264e

Please sign in to comment.