Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

feat: support drop partition table #126

Merged
merged 9 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
chunshao90 committed Jan 6, 2023
commit f26a562b25799bdca62b0e61959ab35a6f6cb640
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,5 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace github.com/CeresDB/ceresdbproto/golang v0.0.0-20230103124322-6e12241032e0 => github.com/chunshao90/ceresdbproto/golang v0.0.0-20230105030022-760be4219100
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230103124322-6e12241032e0 h1:i+15uTiAzkBuz97PfOd5j+H+5yaVCNr5LhcM6oz2PEs=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230103124322-6e12241032e0/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -53,6 +51,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4 h1:tFXjAxje9thrTF4h57Ckik+scJjTWdwAtZqZPtOT48M=
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4/go.mod h1:W8EnPSQ8Nv4fUjc/v1/8tHFqhuOJXnRub0dTfuAQktU=
github.com/chunshao90/ceresdbproto/golang v0.0.0-20230105030022-760be4219100 h1:4+AbnTJBxXJy056XBFeEmFn5pM6MigNuDJLfZ+QE+/0=
github.com/chunshao90/ceresdbproto/golang v0.0.0-20230105030022-760be4219100/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down
6 changes: 6 additions & 0 deletions server/coordinator/eventdispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Dispatch interface {
CloseShard(context context.Context, address string, request CloseShardRequest) error
CreateTableOnShard(context context.Context, address string, request CreateTableOnShardRequest) error
DropTableOnShard(context context.Context, address string, request DropTableOnShardRequest) error
CloseTableOnShard(context context.Context, address string, request CloseTableOnShardRequest) error
}

type OpenShardRequest struct {
Expand Down Expand Up @@ -42,3 +43,8 @@ type DropTableOnShardRequest struct {
UpdateShardInfo UpdateShardInfo
TableInfo cluster.TableInfo
}

type CloseTableOnShardRequest struct {
UpdateShardInfo UpdateShardInfo
TableInfo cluster.TableInfo
}
22 changes: 22 additions & 0 deletions server/coordinator/eventdispatch/dispatch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, reques
return nil
}

func (d *DispatchImpl) CloseTableOnShard(ctx context.Context, addr string, request CloseTableOnShardRequest) error {
client, err := d.getMetaEventClient(ctx, addr)
if err != nil {
return err
}
resp, err := client.CloseTableOnShard(ctx, convertCloseTableOnShardRequestToPB(request))
if err != nil {
return errors.WithMessage(err, "close table on shard")
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("close table on shard, err:%s", resp.GetHeader().GetError())
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func (d *DispatchImpl) getGrpcClient(ctx context.Context, addr string) (*grpc.ClientConn, error) {
client, ok := d.conns.Load(addr)
if !ok {
Expand Down Expand Up @@ -128,6 +143,13 @@ func convertDropTableOnShardRequestToPB(request DropTableOnShardRequest) *metaev
}
}

func convertCloseTableOnShardRequestToPB(request CloseTableOnShardRequest) *metaeventpb.CloseTableOnShardRequest {
return &metaeventpb.CloseTableOnShardRequest{
UpdateShardInfo: convertUpdateShardInfoToPB(request.UpdateShardInfo),
TableInfo: cluster.ConvertTableInfoToPB(request.TableInfo),
}
}

func convertUpdateShardInfoToPB(updateShardInfo UpdateShardInfo) *metaeventpb.UpdateShardInfo {
return &metaeventpb.UpdateShardInfo{
CurrShardInfo: cluster.ConvertShardsInfoToPB(updateShardInfo.CurrShardInfo),
Expand Down
115 changes: 91 additions & 24 deletions server/coordinator/procedure/drop_partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,33 @@ import (
)

// fsm state change:
// ┌────────┐ ┌──────────────────────┐ ┌────────────────────┐ ┌───────────┐
// │ Begin ├─────▶ DropDataTable ├─────▶ DropPartitionTable ├─────▶ Finish │
// └────────┘ └──────────────────────┘ └────────────────────┘ └───────────┘
// ┌────────┐ ┌──────────────────────┐ ┌────────────────────┐ ┌──────────────────────┐ ┌───────────
// │ Begin ├─────▶ DropDataTable ├─────▶ DropPartitionTable ├─────▶ ClosePartitionTables ├─────▶ Finish │
// └────────┘ └──────────────────────┘ └────────────────────┘ └──────────────────────┘ └───────────
const (
eventDropDataTable = "EventDropDataTable"
eventDropPartitionTable = "EventDropPartitionTable"
eventDropPartitionTableFinish = "EventSuccess"
eventClosePartitionTables = "EventClosePartitionTables"
eventDropPartitionTableFinish = "eventDropPartitionTableFinish"
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved

stateDropPartitionTableBegin = "StateBegin"
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
stateDropDataTable = "StateDropDataTable"
stateDropPartitionTable = "StateDropPartitionTable"
stateDropPartitionTableFinish = "StateFinish"
stateClosePartitionTables = "StateClosePartitionTables"
stateDropPartitionTableFinish = "StateDropPartitionTableFinish"
)
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved

var (
createDropPartitionTableEvents = fsm.Events{
{Name: eventDropDataTable, Src: []string{stateDropPartitionTableBegin}, Dst: stateDropDataTable},
{Name: eventDropPartitionTable, Src: []string{stateDropDataTable}, Dst: stateDropPartitionTable},
{Name: eventDropPartitionTableFinish, Src: []string{stateDropPartitionTable}, Dst: stateDropPartitionTableFinish},
{Name: eventClosePartitionTables, Src: []string{stateDropPartitionTable}, Dst: stateClosePartitionTables},
{Name: eventDropPartitionTableFinish, Src: []string{stateClosePartitionTables}, Dst: stateDropPartitionTableFinish},
}
createDropPartitionTableCallbacks = fsm.Callbacks{
eventDropDataTable: dropDataTablesCallback,
eventDropPartitionTable: dropPartitionTableCallback,
eventClosePartitionTables: closePartitionTableCallback,
eventDropPartitionTableFinish: finishDropPartitionTableCallback,
}
)
Expand Down Expand Up @@ -182,13 +186,14 @@ func (p *DropPartitionTableProcedure) convertToMeta() (Meta, error) {
defer p.lock.RUnlock()

rawData := dropPartitionTableRawData{
ID: p.id,
FsmState: p.fsm.Current(),
State: p.state,
ID: p.id,
FsmState: p.fsm.Current(),
State: p.state,
DropTableRequest: p.req,
}
rawDataBytes, err := json.Marshal(rawData)
if err != nil {
return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.id, err)
return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%d, err:%v", p.id, err)
}

meta := Meta{
Expand All @@ -207,7 +212,7 @@ type dropPartitionTableRawData struct {
FsmState string
State State

DropTableResult cluster.DropTableResult
DropTableRequest *metaservicepb.DropTableRequest
}

type dropPartitionTableCallbackRequest struct {
Expand All @@ -223,6 +228,14 @@ type dropPartitionTableCallbackRequest struct {
ret cluster.TableInfo
}

func (d *dropPartitionTableCallbackRequest) schemaName() string {
return d.sourceReq.GetSchemaName()
}

func (d *dropPartitionTableCallbackRequest) tableName() string {
return d.sourceReq.GetName()
}

// 1. Drop data tables in target nodes.
func dropDataTablesCallback(event *fsm.Event) {
req, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event)
Expand All @@ -247,12 +260,56 @@ func dropPartitionTableCallback(event *fsm.Event) {
return
}

err = dropTable(event, req.sourceReq.Name)
err = dropTable(event, req.tableName())
if err != nil {
cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", req.sourceReq.Name))
}
}

// 3. Close partition table in target node.
func closePartitionTableCallback(event *fsm.Event) {
request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event)
if err != nil {
cancelEventWithLog(event, err, "get request from event")
return
}

table, exists, err := request.cluster.GetTable(request.schemaName(), request.tableName())
if err != nil {
cancelEventWithLog(event, err, "get table", zap.String("schemaName", request.sourceReq.GetSchemaName()), zap.String("tableName", request.sourceReq.GetName()))
return
}

if !exists {
log.Warn("drop non-existing table", zap.String("schema", request.schemaName()), zap.String("table", request.tableName()))
return
}

shardNodes, version, err := getShardNodes(request.cluster, table)
if err != nil {
cancelEventWithLog(event, err, "get shard nodes")
return
}
tableInfo := cluster.TableInfo{
ID: table.ID,
Name: table.Name,
SchemaID: table.SchemaID,
SchemaName: request.rawReq.GetSchemaName(),
}

for _, shardNode := range shardNodes {

// Reopen partition table shard.
if err = request.dispatch.CloseTableOnShard(request.ctx, shardNode.NodeName, eventdispatch.CloseTableOnShardRequest{
UpdateShardInfo: eventdispatch.UpdateShardInfo{},
TableInfo: tableInfo,
}); err != nil {
cancelEventWithLog(event, err, "close shard")
return
}
}
}

func finishDropPartitionTableCallback(event *fsm.Event) {
req, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event)
if err != nil {
Expand All @@ -261,44 +318,53 @@ func finishDropPartitionTableCallback(event *fsm.Event) {
}
log.Info("drop partition table finish")

if err := req.onSucceeded(req.ret); err != nil {
if err = req.onSucceeded(req.ret); err != nil {
cancelEventWithLog(event, err, "drop partition table on succeeded")
return
}
}

func getShardNodes(cluster *cluster.Cluster, tableID storage.TableID) ([]storage.ShardNode, map[storage.ShardID]uint64, error) {
shardNodesResult, err := cluster.GetShardNodeByTableIDs([]storage.TableID{tableID})
if err != nil {
return nil, nil, errors.WithMessage(err, "cluster get shard by table id")
}

shardNodes, ok := shardNodesResult.ShardNodes[tableID]
if !ok {
return nil, nil, errors.WithMessagef(err, "cluster get shard by table id, tableID:%v", tableID)
}
return shardNodes, shardNodesResult.Version, nil
}

func dropTable(event *fsm.Event, tableName string) error {
request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event)
if err != nil {
return errors.WithMessage(err, "get request from event")
}
table, exists, err := request.cluster.GetTable(request.sourceReq.GetSchemaName(), tableName)

table, exists, err := request.cluster.GetTable(request.schemaName(), tableName)
if err != nil {
return errors.WithMessage(err, "cluster get table")
}
if !exists {
log.Warn("drop non-existing table", zap.String("schema", request.sourceReq.GetSchemaName()), zap.String("table", tableName))
log.Warn("drop non-existing table", zap.String("schema", request.schemaName()), zap.String("table", tableName))
return nil
}

shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID})
shardNodes, _, err := getShardNodes(request.cluster, table.ID)
if err != nil {
return errors.WithMessage(err, "cluster get shard by table id")
return err
}

result, err := request.cluster.DropTable(request.ctx, request.sourceReq.GetSchemaName(), tableName)
result, err := request.cluster.DropTable(request.ctx, request.schemaName(), tableName)
if err != nil {
return errors.WithMessage(err, "cluster drop table")
}

shardNodes, ok := shardNodesResult.ShardNodes[table.ID]
if !ok {
return errors.WithMessagef(err, "cluster get shard by table id, table:%v", table)
}

// TODO: consider followers
leader := storage.ShardNode{}
found := false
// When table is partition table, it will find the first leader to drop table.
for _, shardNode := range shardNodes {
if shardNode.ShardRole == storage.ShardRoleLeader {
found = true
Expand Down Expand Up @@ -331,5 +397,6 @@ func dropTable(event *fsm.Event, tableName string) error {
if err != nil {
return errors.WithMessage(err, "dispatch drop table on shard")
}

return nil
}
9 changes: 9 additions & 0 deletions server/coordinator/procedure/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (m *ManagerImpl) retry(ctx context.Context, procedure Procedure) error {
}

// Load meta and restore procedure.
// TODO: impl restoreProcedure
func restoreProcedure(meta *Meta) Procedure {
switch meta.Typ {
case Create:
Expand All @@ -176,6 +177,14 @@ func restoreProcedure(meta *Meta) Procedure {
return nil
case Scatter:
return nil
case CreateTable:
return nil
case DropTable:
return nil
case CreatePartitionTable:
return nil
case DropPartitionTable:
return nil
}
return nil
}