diff --git a/go.mod b/go.mod index 9dd05405..4f5a1c48 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/AlekSi/gocov-xml v1.0.0 - github.com/CeresDB/ceresdbproto/golang v0.0.0-20221227065032-1ea76ec99beb + github.com/CeresDB/ceresdbproto/golang v0.0.0-20230106033057-6cc1754ffc31 github.com/axw/gocov v1.1.0 github.com/caarlos0/env/v6 v6.10.1 github.com/json-iterator/go v1.1.11 diff --git a/go.sum b/go.sum index f140d463..54fa2778 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20221227065032-1ea76ec99beb h1:EsuLqjwg4vA7VpxBwZBg+WBwHhrbtaxe/LyiBRyt+LI= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20221227065032-1ea76ec99beb/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230106033057-6cc1754ffc31 h1:34yMmMaN3cW41a6GAPjXMWlk9Ql2RoyyodfQD6DLh40= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230106033057-6cc1754ffc31/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/server/coordinator/eventdispatch/dispatch.go b/server/coordinator/eventdispatch/dispatch.go index e1c4099f..f9cfd08c 100644 --- a/server/coordinator/eventdispatch/dispatch.go +++ b/server/coordinator/eventdispatch/dispatch.go @@ -13,6 +13,7 @@ type Dispatch interface { CloseShard(context context.Context, address string, request CloseShardRequest) error CreateTableOnShard(context context.Context, address string, request CreateTableOnShardRequest) error DropTableOnShard(context context.Context, address string, request DropTableOnShardRequest) error + CloseTableOnShard(context context.Context, address string, request CloseTableOnShardRequest) error } type OpenShardRequest struct { @@ -42,3 +43,8 @@ type DropTableOnShardRequest struct { UpdateShardInfo UpdateShardInfo TableInfo cluster.TableInfo } + +type CloseTableOnShardRequest struct { + UpdateShardInfo UpdateShardInfo + TableInfo cluster.TableInfo +} diff --git a/server/coordinator/eventdispatch/dispatch_impl.go b/server/coordinator/eventdispatch/dispatch_impl.go index 813f287e..9c280fac 100644 --- a/server/coordinator/eventdispatch/dispatch_impl.go +++ b/server/coordinator/eventdispatch/dispatch_impl.go @@ -33,10 +33,10 @@ func (d *DispatchImpl) OpenShard(ctx context.Context, addr string, request OpenS Shard: cluster.ConvertShardsInfoToPB(request.Shard), }) if err != nil { - return errors.WithMessage(err, "open shard") + return errors.WithMessagef(err, "open shard, addr:%s, request:%v", addr, request) } if resp.GetHeader().Code != 0 { - return ErrDispatch.WithCausef("open shard, err:%s", resp.GetHeader().GetError()) + return ErrDispatch.WithCausef("open shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) } return nil } @@ -50,10 +50,10 @@ func (d *DispatchImpl) CloseShard(ctx context.Context, addr string, request Clos ShardId: request.ShardID, }) if err != nil { - return errors.WithMessage(err, "close shard") + return errors.WithMessagef(err, "close shard, addr:%s, request:%v", addr, request) } if resp.GetHeader().Code != 0 { - return ErrDispatch.WithCausef("close shard, err:%s", resp.GetHeader().GetError()) + return ErrDispatch.WithCausef("close shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) } return nil } @@ -65,10 +65,10 @@ func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, requ } resp, err := client.CreateTableOnShard(ctx, convertCreateTableOnShardRequestToPB(request)) if err != nil { - return errors.WithMessage(err, "create table on shard") + return errors.WithMessagef(err, "create table on shard, addr:%s, request:%v", addr, request) } if resp.GetHeader().Code != 0 { - return ErrDispatch.WithCausef("create table on shard, err:%s", resp.GetHeader().GetError()) + return ErrDispatch.WithCausef("create table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) } return nil } @@ -80,10 +80,26 @@ func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, reques } resp, err := client.DropTableOnShard(ctx, convertDropTableOnShardRequestToPB(request)) if err != nil { - return errors.WithMessage(err, "drop table on shard") + return errors.WithMessagef(err, "drop table on shard, addr:%s, request:%v", addr, request) } if resp.GetHeader().Code != 0 { - return ErrDispatch.WithCausef("drop table on shard, err:%s", resp.GetHeader().GetError()) + return ErrDispatch.WithCausef("drop table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) + } + return nil +} + +func (d *DispatchImpl) CloseTableOnShard(ctx context.Context, addr string, request CloseTableOnShardRequest) error { + client, err := d.getMetaEventClient(ctx, addr) + if err != nil { + return err + } + + resp, err := client.CloseTableOnShard(ctx, convertCloseTableOnShardRequestToPB(request)) + if err != nil { + return errors.WithMessagef(err, "close table on shard, addr:%s, request:%v", addr, request) + } + if resp.GetHeader().Code != 0 { + return ErrDispatch.WithCausef("close table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) } return nil } @@ -128,6 +144,13 @@ func convertDropTableOnShardRequestToPB(request DropTableOnShardRequest) *metaev } } +func convertCloseTableOnShardRequestToPB(request CloseTableOnShardRequest) *metaeventpb.CloseTableOnShardRequest { + return &metaeventpb.CloseTableOnShardRequest{ + UpdateShardInfo: convertUpdateShardInfoToPB(request.UpdateShardInfo), + TableInfo: cluster.ConvertTableInfoToPB(request.TableInfo), + } +} + func convertUpdateShardInfoToPB(updateShardInfo UpdateShardInfo) *metaeventpb.UpdateShardInfo { return &metaeventpb.UpdateShardInfo{ CurrShardInfo: cluster.ConvertShardsInfoToPB(updateShardInfo.CurrShardInfo), diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go index b562a6d7..bbc93cf0 100644 --- a/server/coordinator/procedure/common_test.go +++ b/server/coordinator/procedure/common_test.go @@ -48,6 +48,10 @@ func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispa return nil } +func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) error { + return nil +} + func newTestEtcdStorage(t *testing.T) (storage.Storage, clientv3.KV, etcdutil.CloseFn) { _, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t) storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{ diff --git a/server/coordinator/procedure/create_drop_partition_table_test.go b/server/coordinator/procedure/create_drop_partition_table_test.go new file mode 100644 index 00000000..f5eedbc7 --- /dev/null +++ b/server/coordinator/procedure/create_drop_partition_table_test.go @@ -0,0 +1,141 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package procedure + +import ( + "context" + "fmt" + "testing" + + "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" + "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" + "github.com/stretchr/testify/require" +) + +func TestCreateAndDropPartitionTable(t *testing.T) { + ctx := context.Background() + dispatch := MockDispatch{} + manager, c := prepare(t) + s := NewTestStorage(t) + + shardPicker := NewRandomBalancedShardPicker(manager) + + testTableNum := 20 + testSubTableNum := 4 + + // Create table. + for i := 0; i < testTableNum; i++ { + tableName := fmt.Sprintf("%s_%d", testTableName0, i) + subTableNames := genSubTables(tableName, testSubTableNum) + testCreatePartitionTable(ctx, t, dispatch, c, s, shardPicker, tableName, subTableNames) + } + + // Check get table. + for i := 0; i < testTableNum; i++ { + tableName := fmt.Sprintf("%s_%d", testTableName0, i) + checkTable(t, c, tableName, true) + subTableNames := genSubTables(tableName, testSubTableNum) + for _, subTableName := range subTableNames { + checkTable(t, c, subTableName, true) + } + } + + // Drop table. + for i := 0; i < testTableNum; i++ { + tableName := fmt.Sprintf("%s_%d", testTableName0, i) + subTableNames := genSubTables(tableName, testSubTableNum) + testDropPartitionTable(t, dispatch, c, s, tableName, subTableNames) + } + + // Check table not exists. + for i := 0; i < testTableNum; i++ { + tableName := fmt.Sprintf("%s_%d", testTableName0, i) + checkTable(t, c, tableName, false) + subTableNames := genSubTables(tableName, testSubTableNum) + for _, subTableName := range subTableNames { + checkTable(t, c, subTableName, false) + } + } +} + +func testCreatePartitionTable(ctx context.Context, t *testing.T, dispatch eventdispatch.Dispatch, c *cluster.Cluster, s Storage, shardPicker ShardPicker, tableName string, subTableNames []string) { + re := require.New(t) + + request := &metaservicepb.CreateTableRequest{ + Header: &metaservicepb.RequestHeader{ + Node: nodeName0, + ClusterName: clusterName, + }, + PartitionTableInfo: &metaservicepb.PartitionTableInfo{ + SubTableNames: subTableNames, + }, + SchemaName: testSchemaName, + Name: tableName, + } + + getNodeShardResult, err := c.GetNodeShards(ctx) + re.NoError(err) + + nodeNames := make(map[string]int) + for _, nodeShard := range getNodeShardResult.NodeShards { + nodeNames[nodeShard.ShardNode.NodeName] = 1 + } + + partitionTableNum := Max(1, int(float32(len(nodeNames))*defaultPartitionTableProportionOfNodes)) + + partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum, true) + re.NoError(err) + dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames), true) + re.NoError(err) + + procedure := NewCreatePartitionTableProcedure(CreatePartitionTableProcedureRequest{ + 1, c, dispatch, s, request, partitionTableShards, dataTableShards, func(_ cluster.CreateTableResult) error { + return nil + }, func(_ error) error { + return nil + }, + }) + + err = procedure.Start(ctx) + re.NoError(err) +} + +func testDropPartitionTable(t *testing.T, dispatch eventdispatch.Dispatch, c *cluster.Cluster, s Storage, tableName string, subTableNames []string) { + re := require.New(t) + // New DropPartitionTableProcedure to drop table. + req := DropPartitionTableProcedureRequest{ + ID: uint64(1), Dispatch: dispatch, Cluster: c, Request: &metaservicepb.DropTableRequest{ + Header: &metaservicepb.RequestHeader{ + Node: nodeName0, + ClusterName: clusterName, + }, + SchemaName: testSchemaName, + Name: tableName, + PartitionTableInfo: &metaservicepb.PartitionTableInfo{SubTableNames: subTableNames}, + }, OnSucceeded: func(_ cluster.TableInfo) error { + return nil + }, OnFailed: func(_ error) error { + return nil + }, Storage: s, + } + + procedure := NewDropPartitionTableProcedure(req) + err := procedure.Start(context.Background()) + re.NoError(err) +} + +func genSubTables(tableName string, tableNum int) []string { + var subTableNames []string + for j := 0; j < tableNum; j++ { + subTableNames = append(subTableNames, fmt.Sprintf("%s_%d", tableName, j)) + } + return subTableNames +} + +func checkTable(t *testing.T, c *cluster.Cluster, tableName string, exist bool) { + re := require.New(t) + _, b, err := c.GetTable(testSchemaName, tableName) + re.NoError(err) + re.Equal(b, exist) +} diff --git a/server/coordinator/procedure/create_partition_table.go b/server/coordinator/procedure/create_partition_table.go index bf377544..01d71642 100644 --- a/server/coordinator/procedure/create_partition_table.go +++ b/server/coordinator/procedure/create_partition_table.go @@ -128,7 +128,7 @@ func (p *CreatePartitionTableProcedure) Start(ctx context.Context) error { } if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil { p.updateStateWithLock(StateFailed) - return errors.WithMessagef(err, "create partition table procedure create new shard view") + return errors.WithMessage(err, "create partition table procedure create new shard view") } case stateCreatePartitionTable: if err := p.persist(ctx); err != nil { @@ -136,7 +136,7 @@ func (p *CreatePartitionTableProcedure) Start(ctx context.Context) error { } if err := p.fsm.Event(eventCreateDataTables, createPartitionTableRequest); err != nil { p.updateStateWithLock(StateFailed) - return errors.WithMessagef(err, "create partition table procedure create partition table") + return errors.WithMessage(err, "create partition table procedure create partition table") } case stateCreateDataTables: if err := p.persist(ctx); err != nil { @@ -144,7 +144,7 @@ func (p *CreatePartitionTableProcedure) Start(ctx context.Context) error { } if err := p.fsm.Event(eventOpenPartitionTables, createPartitionTableRequest); err != nil { p.updateStateWithLock(StateFailed) - return errors.WithMessagef(err, "create partition table procedure create data tables") + return errors.WithMessage(err, "create partition table procedure create data tables") } case stateOpenPartitionTables: if err := p.persist(ctx); err != nil { @@ -152,7 +152,7 @@ func (p *CreatePartitionTableProcedure) Start(ctx context.Context) error { } if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil { p.updateStateWithLock(StateFailed) - return errors.WithMessagef(err, "create partition table procedure open partition tables") + return errors.WithMessage(err, "create partition table procedure open partition tables") } case stateFinish: // TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine. diff --git a/server/coordinator/procedure/drop_partition_table.go b/server/coordinator/procedure/drop_partition_table.go new file mode 100644 index 00000000..d53d00e4 --- /dev/null +++ b/server/coordinator/procedure/drop_partition_table.go @@ -0,0 +1,434 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package procedure + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/looplab/fsm" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// fsm state change: +// ┌────────┐ ┌────────────────┐ ┌────────────────────┐ ┌──────────────────────┐ ┌───────────┐ +// │ Begin ├─────▶ DropDataTable ├─────▶ DropPartitionTable ├─────▶ ClosePartitionTables ├─────▶ Finish │ +// └────────┘ └────────────────┘ └────────────────────┘ └──────────────────────┘ └───────────┘ +const ( + eventDropDataTable = "EventDropDataTable" + eventDropPartitionTable = "EventDropPartitionTable" + eventClosePartitionTables = "EventClosePartitionTables" + eventDropPartitionTableFinish = "EventDropPartitionTableFinish" + + stateDropPartitionTableBegin = "StateDropPartitionBegin" + stateDropDataTable = "StateDropDataTable" + stateDropPartitionTable = "StateDropPartitionTable" + stateClosePartitionTables = "StateClosePartitionTables" + stateDropPartitionTableFinish = "StateDropPartitionTableFinish" +) + +var ( + createDropPartitionTableEvents = fsm.Events{ + {Name: eventDropDataTable, Src: []string{stateDropPartitionTableBegin}, Dst: stateDropDataTable}, + {Name: eventDropPartitionTable, Src: []string{stateDropDataTable}, Dst: stateDropPartitionTable}, + {Name: eventClosePartitionTables, Src: []string{stateDropPartitionTable}, Dst: stateClosePartitionTables}, + {Name: eventDropPartitionTableFinish, Src: []string{stateClosePartitionTables}, Dst: stateDropPartitionTableFinish}, + } + createDropPartitionTableCallbacks = fsm.Callbacks{ + eventDropDataTable: dropDataTablesCallback, + eventDropPartitionTable: dropPartitionTableCallback, + eventClosePartitionTables: closePartitionTableCallback, + eventDropPartitionTableFinish: finishDropPartitionTableCallback, + } +) + +type DropPartitionTableProcedure struct { + id uint64 + fsm *fsm.FSM + cluster *cluster.Cluster + dispatch eventdispatch.Dispatch + storage Storage + + onSucceeded func(cluster.TableInfo) error + onFailed func(error) error + + request *metaservicepb.DropTableRequest + + // Protect the state. + lock sync.RWMutex + state State +} + +type DropPartitionTableProcedureRequest struct { + ID uint64 + Cluster *cluster.Cluster + Dispatch eventdispatch.Dispatch + Storage Storage + Request *metaservicepb.DropTableRequest + OnSucceeded func(result cluster.TableInfo) error + OnFailed func(error) error +} + +func NewDropPartitionTableProcedure(request DropPartitionTableProcedureRequest) *DropPartitionTableProcedure { + fsm := fsm.NewFSM( + stateDropPartitionTableBegin, + createDropPartitionTableEvents, + createDropPartitionTableCallbacks, + ) + return &DropPartitionTableProcedure{ + id: request.ID, + fsm: fsm, + cluster: request.Cluster, + dispatch: request.Dispatch, + storage: request.Storage, + request: request.Request, + onSucceeded: request.OnSucceeded, + onFailed: request.OnFailed, + } +} + +func (p *DropPartitionTableProcedure) ID() uint64 { + return p.id +} + +func (p *DropPartitionTableProcedure) Typ() Typ { + return DropPartitionTable +} + +func (p *DropPartitionTableProcedure) Start(ctx context.Context) error { + p.updateStateWithLock(StateRunning) + + dropPartitionTableRequest := &dropPartitionTableCallbackRequest{ + ctx: ctx, + cluster: p.cluster, + dispatch: p.dispatch, + request: p.request, + onSucceeded: p.onSucceeded, + onFailed: p.onFailed, + } + + for { + switch p.fsm.Current() { + case stateDropPartitionTableBegin: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "drop partition table procedure persist") + } + if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessage(err, "drop partition table procedure") + } + case stateDropDataTable: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "drop partition table procedure persist") + } + if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessage(err, "drop partition table procedure drop data table") + } + case stateDropPartitionTable: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "drop partition table procedure persist") + } + if err := p.fsm.Event(eventClosePartitionTables, dropPartitionTableRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessage(err, "drop partition table procedure drop partition table") + } + case stateClosePartitionTables: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "drop partition table procedure persist") + } + if err := p.fsm.Event(eventDropPartitionTableFinish, dropPartitionTableRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessage(err, "drop partition table procedure close partition tables") + } + case stateDropPartitionTableFinish: + p.updateStateWithLock(StateFinished) + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "drop partition table procedure persist") + } + return nil + } + } +} + +func (p *DropPartitionTableProcedure) Cancel(_ context.Context) error { + p.updateStateWithLock(StateCancelled) + return nil +} + +func (p *DropPartitionTableProcedure) State() State { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.state +} + +func (p *DropPartitionTableProcedure) updateStateWithLock(state State) { + p.lock.Lock() + defer p.lock.Unlock() + + p.state = state +} + +func (p *DropPartitionTableProcedure) persist(ctx context.Context) error { + meta, err := p.convertToMeta() + if err != nil { + return errors.WithMessage(err, "convert to meta") + } + err = p.storage.CreateOrUpdate(ctx, meta) + if err != nil { + return errors.WithMessage(err, "createOrUpdate procedure storage") + } + return nil +} + +func (p *DropPartitionTableProcedure) convertToMeta() (Meta, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + rawData := dropPartitionTableRawData{ + ID: p.id, + FsmState: p.fsm.Current(), + State: p.state, + DropTableRequest: p.request, + } + rawDataBytes, err := json.Marshal(rawData) + if err != nil { + return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%d, err:%v", p.id, err) + } + + meta := Meta{ + ID: p.id, + Typ: DropPartitionTable, + State: p.state, + + RawData: rawDataBytes, + } + + return meta, nil +} + +type dropPartitionTableRawData struct { + ID uint64 + FsmState string + State State + + DropTableRequest *metaservicepb.DropTableRequest +} + +type dropPartitionTableCallbackRequest struct { + ctx context.Context + cluster *cluster.Cluster + dispatch eventdispatch.Dispatch + + onSucceeded func(info cluster.TableInfo) error + onFailed func(error) error + + request *metaservicepb.DropTableRequest + versions []cluster.ShardVersionUpdate + table storage.Table +} + +func (d *dropPartitionTableCallbackRequest) schemaName() string { + return d.request.GetSchemaName() +} + +func (d *dropPartitionTableCallbackRequest) tableName() string { + return d.request.GetName() +} + +// 1. Drop data tables in target nodes. +func dropDataTablesCallback(event *fsm.Event) { + req, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + + if len(req.request.PartitionTableInfo.SubTableNames) == 0 { + cancelEventWithLog(event, ErrEmptyPartitionNames, fmt.Sprintf("drop table, table:%s", req.request.Name)) + return + } + + for _, tableName := range req.request.PartitionTableInfo.SubTableNames { + table, dropTableResult, exists, err := dropTableMetaData(event, tableName) + if err != nil { + cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName)) + return + } + + if !exists { + continue + } + + if len(dropTableResult.ShardVersionUpdate) != 1 { + cancelEventWithLog(event, ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableResult.ShardVersionUpdate))) + return + } + + if err := dispatchDropTable(event, table, dropTableResult.ShardVersionUpdate[0]); err != nil { + cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName)) + return + } + } +} + +// 2. Drop partition table in target node. +func dropPartitionTableCallback(event *fsm.Event) { + request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + + table, dropTableRet, exists, err := dropTableMetaData(event, request.tableName()) + if err != nil { + cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", request.tableName())) + return + } + if !exists { + cancelEventWithLog(event, ErrTableNotExists, fmt.Sprintf("table:%s", request.tableName())) + return + } + + if len(dropTableRet.ShardVersionUpdate) == 0 { + cancelEventWithLog(event, ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult need >=1, current is %d", len(dropTableRet.ShardVersionUpdate))) + return + } + + request.versions = dropTableRet.ShardVersionUpdate + request.table = table + + // Drop table in the first shard. + if err := dispatchDropTable(event, table, dropTableRet.ShardVersionUpdate[0]); err != nil { + cancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", table.Name)) + return + } +} + +// 3. Close partition table in target node. +func closePartitionTableCallback(event *fsm.Event) { + request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + + var shardNodes []storage.ShardNode + for _, version := range request.versions { + ret, err := request.cluster.GetShardNodesByShardID(version.ShardID) + if err != nil { + cancelEventWithLog(event, err, "get shard nodes by shard id") + return + } + shardNodes = append(shardNodes, ret...) + } + + tableInfo := cluster.TableInfo{ + ID: request.table.ID, + Name: request.table.Name, + SchemaID: request.table.SchemaID, + SchemaName: request.request.GetSchemaName(), + } + + for _, shardNode := range shardNodes { + // Close partition table shard. + if err = request.dispatch.CloseTableOnShard(request.ctx, shardNode.NodeName, eventdispatch.CloseTableOnShardRequest{ + UpdateShardInfo: eventdispatch.UpdateShardInfo{}, + TableInfo: tableInfo, + }); err != nil { + cancelEventWithLog(event, err, "close shard") + return + } + } +} + +func finishDropPartitionTableCallback(event *fsm.Event) { + request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + log.Info("drop partition table finish") + + tableInfo := cluster.TableInfo{ + ID: request.table.ID, + Name: request.table.Name, + SchemaID: request.table.SchemaID, + SchemaName: request.request.GetSchemaName(), + } + + if err = request.onSucceeded(tableInfo); err != nil { + cancelEventWithLog(event, err, "drop partition table on succeeded") + return + } +} + +func dropTableMetaData(event *fsm.Event, tableName string) (storage.Table, cluster.DropTableResult, bool, error) { + request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event) + if err != nil { + return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "get request from event") + } + + table, exists, err := request.cluster.GetTable(request.schemaName(), tableName) + if err != nil { + return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "cluster get table") + } + if !exists { + log.Warn("drop non-existing table", zap.String("schema", request.schemaName()), zap.String("table", tableName)) + return storage.Table{}, cluster.DropTableResult{}, false, nil + } + + result, err := request.cluster.DropTable(request.ctx, request.schemaName(), tableName) + if err != nil { + return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "cluster drop table") + } + + return table, result, true, nil +} + +func dispatchDropTable(event *fsm.Event, table storage.Table, version cluster.ShardVersionUpdate) error { + request, err := getRequestFromEvent[*dropPartitionTableCallbackRequest](event) + if err != nil { + return errors.WithMessage(err, "get request from event") + } + shardNodes, err := request.cluster.GetShardNodesByShardID(version.ShardID) + if err != nil { + cancelEventWithLog(event, err, "get shard nodes by shard id") + return errors.WithMessage(err, "cluster get shard by shard id") + } + + tableInfo := cluster.TableInfo{ + ID: table.ID, + Name: table.Name, + SchemaID: table.SchemaID, + SchemaName: request.request.GetSchemaName(), + } + + for _, shardNode := range shardNodes { + err = request.dispatch.DropTableOnShard(request.ctx, shardNode.NodeName, eventdispatch.DropTableOnShardRequest{ + UpdateShardInfo: eventdispatch.UpdateShardInfo{ + CurrShardInfo: cluster.ShardInfo{ + ID: version.ShardID, + Role: storage.ShardRoleLeader, + Version: version.CurrVersion, + }, + PrevVersion: version.PrevVersion, + }, + TableInfo: tableInfo, + }) + if err != nil { + return errors.WithMessage(err, "dispatch drop table on shard") + } + } + + return nil +} diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index e33fa7b0..d9a3090c 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -8,6 +8,7 @@ var ( ErrShardLeaderNotFound = coderr.NewCodeError(coderr.Internal, "shard leader not found") ErrProcedureNotFound = coderr.NewCodeError(coderr.Internal, "procedure not found") ErrClusterConfigChanged = coderr.NewCodeError(coderr.Internal, "cluster config changed") + ErrTableNotExists = coderr.NewCodeError(coderr.Internal, "table not exists") ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists") ErrProcedureTypeNotMatch = coderr.NewCodeError(coderr.Internal, "procedure type not match") ErrDecodeRawData = coderr.NewCodeError(coderr.Internal, "decode raw data") diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go index cc7f4ff1..b0cab4c0 100644 --- a/server/coordinator/procedure/factory.go +++ b/server/coordinator/procedure/factory.go @@ -60,6 +60,10 @@ type DropTableRequest struct { OnFailed func(error) error } +func (d DropTableRequest) IsPartitionTable() bool { + return d.SourceReq.PartitionTableInfo != nil +} + type TransferLeaderRequest struct { ClusterName string ShardID storage.ShardID @@ -202,6 +206,21 @@ func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTabl if err != nil { return nil, err } + + if request.IsPartitionTable() { + req := DropPartitionTableProcedureRequest{ + ID: id, + Cluster: request.Cluster, + Dispatch: f.dispatch, + Storage: f.storage, + Request: request.SourceReq, + OnSucceeded: request.OnSucceeded, + OnFailed: request.OnFailed, + } + procedure := NewDropPartitionTableProcedure(req) + return procedure, nil + } + procedure := NewDropTableProcedure(f.dispatch, request.Cluster, id, request.SourceReq, request.OnSucceeded, request.OnFailed) return procedure, nil diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go index 5d289389..4ce08692 100644 --- a/server/coordinator/procedure/manager_impl.go +++ b/server/coordinator/procedure/manager_impl.go @@ -179,6 +179,14 @@ func restoreProcedure(meta *Meta) Procedure { return nil case Scatter: return nil + case CreateTable: + return nil + case DropTable: + return nil + case CreatePartitionTable: + return nil + case DropPartitionTable: + return nil } return nil } diff --git a/server/coordinator/procedure/procedure.go b/server/coordinator/procedure/procedure.go index 516f9be0..5ffd8e93 100644 --- a/server/coordinator/procedure/procedure.go +++ b/server/coordinator/procedure/procedure.go @@ -29,6 +29,7 @@ const ( CreateTable DropTable CreatePartitionTable + DropPartitionTable ) // Procedure is used to describe how to execute a set of operations from the scheduler, e.g. SwitchLeaderProcedure, MergeShardProcedure.