diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index 16c2a325..72b6c9e5 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -251,6 +251,32 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName string) (storage.Table, return c.tableManager.GetTable(schemaName, tableName) } +func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request CreateTableMetadataRequest) (CreateTableMetadataResult, error) { + log.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName)) + + _, exists, err := c.tableManager.GetTable(request.SchemaName, request.TableName) + if err != nil { + return CreateTableMetadataResult{}, err + } + + if exists { + return CreateTableMetadataResult{}, ErrTableAlreadyExists + } + + // Create table in table manager. + table, err := c.tableManager.CreateTable(ctx, request.SchemaName, request.TableName, request.PartitionInfo) + if err != nil { + return CreateTableMetadataResult{}, errors.WithMessage(err, "table manager create table") + } + + res := CreateTableMetadataResult{ + Table: table, + } + + log.Info("create table metadata succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", res)), zap.Object("result", res)) + return res, nil +} + func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRequest) (CreateTableResult, error) { log.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName)) diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go index e5ad1aa1..0eaffb55 100644 --- a/server/cluster/metadata/types.go +++ b/server/cluster/metadata/types.go @@ -48,6 +48,16 @@ type CreateClusterOpts struct { ShardTotal uint32 } +type CreateTableMetadataRequest struct { + SchemaName string + TableName string + PartitionInfo storage.PartitionInfo +} + +type CreateTableMetadataResult struct { + Table storage.Table +} + type CreateTableRequest struct { ShardID storage.ShardID SchemaName string diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index dbf485f6..baf3e5d3 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -37,8 +37,8 @@ type Factory struct { } type CreateTableRequest struct { - Cluster *metadata.ClusterMetadata - SourceReq *metaservicepb.CreateTableRequest + ClusterMetadata *metadata.ClusterMetadata + SourceReq *metaservicepb.CreateTableRequest OnSucceeded func(metadata.CreateTableResult) error OnFailed func(error) error @@ -49,8 +49,8 @@ func (request *CreateTableRequest) isPartitionTable() bool { } type DropTableRequest struct { - Cluster *metadata.ClusterMetadata - SourceReq *metaservicepb.DropTableRequest + ClusterMetadata *metadata.ClusterMetadata + SourceReq *metaservicepb.DropTableRequest OnSucceeded func(metadata.TableInfo) error OnFailed func(error) error @@ -102,7 +102,7 @@ func (f *Factory) MakeCreateTableProcedure(ctx context.Context, request CreateTa if isPartitionTable { return f.makeCreatePartitionTableProcedure(ctx, CreatePartitionTableRequest{ - Cluster: request.Cluster, + Cluster: request.ClusterMetadata, SourceReq: request.SourceReq, PartitionTableRatioOfNodes: f.partitionTableProportionOfNodes, OnSucceeded: request.OnSucceeded, @@ -118,6 +118,8 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa if err != nil { return nil, err } + snapshot := request.ClusterMetadata.GetClusterSnapshot() + shards, err := f.shardPicker.PickShards(ctx, request.Cluster.Name(), 1, false) if err != nil { log.Error("pick table shard", zap.Error(err)) @@ -129,15 +131,14 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa } procedure := createtable.NewProcedure(createtable.ProcedureRequest{ - Dispatch: f.dispatch, - Cluster: request.Cluster, - ID: id, - ShardID: shards[0].ShardInfo.ID, - ShardVersion: shards[0].ShardInfo.Version, - ClusterVersion: request.Cluster.GetClusterViewVersion(), - Req: request.SourceReq, - OnSucceeded: request.OnSucceeded, - OnFailed: request.OnFailed, + Dispatch: f.dispatch, + ClusterMetadata: request.ClusterMetadata, + ClusterSnapshot: snapshot, + ID: id, + ShardID: shards[0].ShardInfo.ID, + Req: request.SourceReq, + OnSucceeded: request.OnSucceeded, + OnFailed: request.OnFailed, }) return procedure, nil } @@ -205,7 +206,7 @@ func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTabl return procedure, nil } - procedure := droptable.NewDropTableProcedure(f.dispatch, request.Cluster, id, + procedure := droptable.NewDropTableProcedure(f.dispatch, request.ClusterMetadata, id, request.SourceReq, request.OnSucceeded, request.OnFailed) return procedure, nil } diff --git a/server/coordinator/procedure/create_table_common_util.go b/server/coordinator/procedure/ddl/create_table_common_util.go similarity index 89% rename from server/coordinator/procedure/create_table_common_util.go rename to server/coordinator/procedure/ddl/create_table_common_util.go index 0f68dba8..1c817530 100644 --- a/server/coordinator/procedure/create_table_common_util.go +++ b/server/coordinator/procedure/ddl/create_table_common_util.go @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -package procedure +package ddl import ( "context" @@ -9,6 +9,7 @@ import ( "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" ) @@ -19,7 +20,7 @@ func CreateTableMetadata(ctx context.Context, c *metadata.ClusterMetadata, schem return metadata.CreateTableResult{}, errors.WithMessage(err, "cluster get table") } if exists { - return metadata.CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "create an existing table, schemaName:%s, tableName:%s", schemaName, tableName) + return metadata.CreateTableResult{}, errors.WithMessagef(procedure.ErrTableAlreadyExists, "create an existing table, schemaName:%s, tableName:%s", schemaName, tableName) } createTableResult, err := c.CreateTable(ctx, metadata.CreateTableRequest{ @@ -50,7 +51,7 @@ func CreateTableOnShard(ctx context.Context, c *metadata.ClusterMetadata, dispat } } if !found { - return errors.WithMessagef(ErrShardLeaderNotFound, "shard node can't find leader, shardID:%d", shardID) + return errors.WithMessagef(procedure.ErrShardLeaderNotFound, "shard node can't find leader, shardID:%d", shardID) } err = dispatch.CreateTableOnShard(ctx, leader.NodeName, request) diff --git a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go new file mode 100644 index 00000000..6b15d94c --- /dev/null +++ b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go @@ -0,0 +1,308 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package createpartitiontable + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/cluster/metadata" + "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure/ddl" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/looplab/fsm" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// fsm state change: +// ┌────────┐ ┌──────────────────────┐ ┌────────────────────┐ ┌───────────┐ +// │ Begin ├─────▶ CreatePartitionTable ├─────▶ CreateDataTables ├──────▶ Finish │ +// └────────┘ └──────────────────────┘ └────────────────────┘ └───────────┘ +const ( + eventCreatePartitionTable = "EventCreatePartitionTable" + eventCreateSubTables = "EventCreateSubTables" + eventFinish = "EventFinish" + + stateBegin = "StateBegin" + stateCreatePartitionTable = "StateCreatePartitionTable" + stateCreateSubTables = "StateCreateSubTables" + stateFinish = "StateFinish" +) + +var ( + createPartitionTableEvents = fsm.Events{ + {Name: eventCreatePartitionTable, Src: []string{stateBegin}, Dst: stateCreatePartitionTable}, + {Name: eventCreateSubTables, Src: []string{stateCreatePartitionTable}, Dst: stateCreateSubTables}, + {Name: eventFinish, Src: []string{stateCreateSubTables}, Dst: stateFinish}, + } + createPartitionTableCallbacks = fsm.Callbacks{ + eventCreatePartitionTable: createPartitionTableCallback, + eventCreateSubTables: createDataTablesCallback, + eventFinish: finishCallback, + } +) + +type Procedure struct { + fsm *fsm.FSM + params ProcedureParams + relatedVersionInfo procedure.RelatedVersionInfo + createPartitionTableResult metadata.CreateTableMetadataResult + + lock sync.RWMutex + state procedure.State +} + +type ProcedureParams struct { + ID uint64 + ClusterMetadata *metadata.ClusterMetadata + ClusterSnapshot metadata.Snapshot + Dispatch eventdispatch.Dispatch + Storage procedure.Storage + SourceReq *metaservicepb.CreateTableRequest + SubTablesShards []metadata.ShardNodeWithVersion + OnSucceeded func(metadata.CreateTableResult) error + OnFailed func(error) error +} + +func NewProcedure(params ProcedureParams) *Procedure { + relatedVersionInfo := buildRelatedVersionInfo(params) + + fsm := fsm.NewFSM( + stateBegin, + createPartitionTableEvents, + createPartitionTableCallbacks, + ) + + return &Procedure{ + fsm: fsm, + relatedVersionInfo: relatedVersionInfo, + params: params, + state: procedure.StateInit, + } +} + +func buildRelatedVersionInfo(params ProcedureParams) procedure.RelatedVersionInfo { + shardWithVersion := make(map[storage.ShardID]uint64, len(params.SubTablesShards)) + shardIDs := make([]storage.ShardID, 0, len(params.SubTablesShards)) + for _, shardView := range params.SubTablesShards { + shardIDs = append(shardIDs, shardView.ShardInfo.ID) + } + for _, shardID := range shardIDs { + for _, shardView := range params.ClusterSnapshot.Topology.ShardViews { + if shardView.ShardID == shardID { + shardWithVersion[shardID] = shardView.Version + continue + } + } + } + return procedure.RelatedVersionInfo{ + ClusterID: params.ClusterSnapshot.Topology.ClusterView.ClusterID, + ShardWithVersion: shardWithVersion, + ClusterVersion: params.ClusterSnapshot.Topology.ClusterView.Version, + } +} + +func (p *Procedure) ID() uint64 { + return p.params.ID +} + +func (p *Procedure) Typ() procedure.Typ { + return procedure.CreatePartitionTable +} + +func (p *Procedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + return p.relatedVersionInfo +} + +func (p *Procedure) Priority() procedure.Priority { + return procedure.PriorityLow +} + +func (p *Procedure) Start(ctx context.Context) error { + p.updateStateWithLock(procedure.StateRunning) + + createPartitionTableRequest := &callbackRequest{ + ctx: ctx, + p: p, + } + + for { + switch p.fsm.Current() { + case stateBegin: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "persist create partition table procedure") + } + if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil { + p.updateStateWithLock(procedure.StateFailed) + return errors.WithMessage(err, "create partition table") + } + case stateCreatePartitionTable: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "persist create partition table procedure") + } + if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil { + p.updateStateWithLock(procedure.StateFailed) + return errors.WithMessage(err, "create data tables") + } + case stateCreateSubTables: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "persist create partition table procedure") + } + if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil { + p.updateStateWithLock(procedure.StateFailed) + return errors.WithMessage(err, "update table shard metadata") + } + case stateFinish: + // TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine. + p.updateStateWithLock(procedure.StateFinished) + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "create partition table procedure persist") + } + return nil + } + } +} + +func (p *Procedure) Cancel(_ context.Context) error { + p.updateStateWithLock(procedure.StateCancelled) + return nil +} + +func (p *Procedure) State() procedure.State { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.state +} + +type callbackRequest struct { + ctx context.Context + p *Procedure +} + +// 1. Create partition table in target node. +func createPartitionTableCallback(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + params := req.p.params + + createTableMetadaResult, err := params.ClusterMetadata.CreateTableMetadata(req.ctx, metadata.CreateTableMetadataRequest{ + SchemaName: params.SourceReq.GetSchemaName(), + TableName: params.SourceReq.GetName(), + PartitionInfo: storage.PartitionInfo{Info: params.SourceReq.PartitionTableInfo.GetPartitionInfo()}, + }) + if err != nil { + procedure.CancelEventWithLog(event, err, "create table metadata") + return + } + req.p.createPartitionTableResult = createTableMetadaResult +} + +// 2. Create data tables in target nodes. +func createDataTablesCallback(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + params := req.p.params + if len(params.SubTablesShards) != len(params.SourceReq.GetPartitionTableInfo().SubTableNames) { + panic(fmt.Sprintf("shards number must be equal to sub tables number, shardNumber:%d, subTableNumber:%d", len(params.SubTablesShards), len(params.SourceReq.GetPartitionTableInfo().SubTableNames))) + return + } + + for i, subTableShard := range params.SubTablesShards { + createTableResult, err := ddl.CreateTableMetadata(req.ctx, params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetPartitionTableInfo().SubTableNames[i], subTableShard.ShardInfo.ID, nil) + if err != nil { + procedure.CancelEventWithLog(event, err, "create table metadata") + return + } + + if err = ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, subTableShard.ShardInfo.ID, ddl.BuildCreateTableRequest(createTableResult, params.SourceReq, nil)); err != nil { + procedure.CancelEventWithLog(event, err, "dispatch create table on shard") + return + } + } +} + +func finishCallback(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + log.Info("create partition table finish", zap.String("tableName", req.p.params.SourceReq.GetName())) + + if err := req.p.params.OnSucceeded(metadata.CreateTableResult{ + Table: req.p.createPartitionTableResult.Table, + ShardVersionUpdate: metadata.ShardVersionUpdate{}, + }); err != nil { + procedure.CancelEventWithLog(event, err, "create partition table on succeeded") + return + } +} + +func (p *Procedure) updateStateWithLock(state procedure.State) { + p.lock.Lock() + defer p.lock.Unlock() + + p.state = state +} + +func (p *Procedure) persist(ctx context.Context) error { + meta, err := p.convertToMeta() + if err != nil { + return errors.WithMessage(err, "convert to meta") + } + err = p.params.Storage.CreateOrUpdate(ctx, meta) + if err != nil { + return errors.WithMessage(err, "createOrUpdate procedure storage") + } + return nil +} + +// TODO: Replace rawData with structure defined by proto. +type rawData struct { + ID uint64 + FsmState string + State procedure.State + + CreateTableResult metadata.CreateTableResult + PartitionTableShards []metadata.ShardNodeWithVersion + SubTablesShards []metadata.ShardNodeWithVersion +} + +func (p *Procedure) convertToMeta() (procedure.Meta, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + rawData := rawData{ + ID: p.params.ID, + FsmState: p.fsm.Current(), + State: p.state, + SubTablesShards: p.params.SubTablesShards, + } + rawDataBytes, err := json.Marshal(rawData) + if err != nil { + return procedure.Meta{}, procedure.ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.params.ID, err) + } + + meta := procedure.Meta{ + ID: p.params.ID, + Typ: procedure.CreatePartitionTable, + State: p.state, + + RawData: rawDataBytes, + } + + return meta, nil +} diff --git a/server/coordinator/procedure/dml/createpartitiontable/create_partition_table_test.go b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table_test.go similarity index 100% rename from server/coordinator/procedure/dml/createpartitiontable/create_partition_table_test.go rename to server/coordinator/procedure/ddl/createpartitiontable/create_partition_table_test.go diff --git a/server/coordinator/procedure/dml/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go similarity index 62% rename from server/coordinator/procedure/dml/createtable/create_table.go rename to server/coordinator/procedure/ddl/createtable/create_table.go index ed8c5b76..0ed52e21 100644 --- a/server/coordinator/procedure/dml/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -8,9 +8,10 @@ import ( "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/pkg/log" - "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure/ddl" "github.com/CeresDB/ceresmeta/server/storage" "github.com/looplab/fsm" "github.com/pkg/errors" @@ -46,14 +47,15 @@ func prepareCallback(event *fsm.Event) { procedure.CancelEventWithLog(event, err, "get request from event") return } + params := req.p.params - createTableResult, err := procedure.CreateTableMetadata(req.ctx, req.cluster, req.sourceReq.GetSchemaName(), req.sourceReq.GetName(), req.shardID, nil) + createTableResult, err := ddl.CreateTableMetadata(req.ctx, params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName(), params.ShardID, nil) if err != nil { procedure.CancelEventWithLog(event, err, "create table metadata") return } - if err = procedure.CreateTableOnShard(req.ctx, req.cluster, req.dispatch, createTableResult.ShardVersionUpdate.ShardID, procedure.BuildCreateTableRequest(createTableResult, req.sourceReq, req.sourceReq.GetPartitionTableInfo().GetPartitionInfo())); err != nil { + if err = ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, createTableResult.ShardVersionUpdate.ShardID, ddl.BuildCreateTableRequest(createTableResult, params.SourceReq, params.SourceReq.GetPartitionTableInfo().GetPartitionInfo())); err != nil { procedure.CancelEventWithLog(event, err, "dispatch create table on shard") return } @@ -68,7 +70,7 @@ func successCallback(event *fsm.Event) { return } - if err := req.onSucceeded(req.createTableResult); err != nil { + if err := req.p.params.OnSucceeded(req.createTableResult); err != nil { log.Error("exec success callback failed") } } @@ -80,75 +82,70 @@ func failedCallback(event *fsm.Event) { return } - if err := req.onFailed(event.Err); err != nil { + if err := req.p.params.OnFailed(event.Err); err != nil { log.Error("exec failed callback failed") } } // callbackRequest is fsm callbacks param. type callbackRequest struct { - ctx context.Context - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch - - sourceReq *metaservicepb.CreateTableRequest - shardID storage.ShardID - - onSucceeded func(cluster.CreateTableResult) error - onFailed func(error) error - - createTableResult cluster.CreateTableResult + ctx context.Context + p *Procedure + createTableResult metadata.CreateTableResult } -type ProcedureRequest struct { - Dispatch eventdispatch.Dispatch - Cluster *cluster.Cluster - ID uint64 - ShardID storage.ShardID - Req *metaservicepb.CreateTableRequest - OnSucceeded func(cluster.CreateTableResult) error - OnFailed func(error) error +type ProcedureParams struct { + Dispatch eventdispatch.Dispatch + ClusterMetadata *metadata.ClusterMetadata + ClusterSnapshot metadata.Snapshot + ID uint64 + ShardID storage.ShardID + SourceReq *metaservicepb.CreateTableRequest + OnSucceeded func(metadata.CreateTableResult) error + OnFailed func(error) error } -func NewProcedure(req ProcedureRequest) procedure.Procedure { +func NewProcedure(params ProcedureParams) procedure.Procedure { fsm := fsm.NewFSM( stateBegin, createTableEvents, createTableCallbacks, ) return &Procedure{ - id: req.ID, - fsm: fsm, - cluster: req.Cluster, - dispatch: req.Dispatch, - shardID: req.ShardID, - req: req.Req, - state: procedure.StateInit, - onSucceeded: req.OnSucceeded, - onFailed: req.OnFailed, + fsm: fsm, + params: params, + state: procedure.StateInit, } } type Procedure struct { - id uint64 - fsm *fsm.FSM - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch - - shardID storage.ShardID - - req *metaservicepb.CreateTableRequest - - onSucceeded func(cluster.CreateTableResult) error - onFailed func(error) error - + fsm *fsm.FSM + params ProcedureParams // Protect the state. lock sync.RWMutex state procedure.State } +func (p *Procedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + shardWithVersion := make(map[storage.ShardID]uint64, 1) + for _, shardView := range p.params.ClusterSnapshot.Topology.ShardViews { + if shardView.ShardID == p.params.ShardID { + shardWithVersion[p.params.ShardID] = shardView.Version + } + } + return procedure.RelatedVersionInfo{ + ClusterID: p.params.ClusterSnapshot.Topology.ClusterView.ClusterID, + ShardWithVersion: shardWithVersion, + ClusterVersion: p.params.ClusterSnapshot.Topology.ClusterView.Version, + } +} + +func (p *Procedure) Priority() procedure.Priority { + return procedure.PriorityLow +} + func (p *Procedure) ID() uint64 { - return p.id + return p.params.ID } func (p *Procedure) Typ() procedure.Typ { @@ -159,13 +156,8 @@ func (p *Procedure) Start(ctx context.Context) error { p.updateState(procedure.StateRunning) req := &callbackRequest{ - cluster: p.cluster, - ctx: ctx, - dispatch: p.dispatch, - shardID: p.shardID, - sourceReq: p.req, - onSucceeded: p.onSucceeded, - onFailed: p.onFailed, + ctx: ctx, + p: p, } if err := p.fsm.Event(eventPrepare, req); err != nil { diff --git a/server/coordinator/procedure/dml/createtable/create_table_test.go b/server/coordinator/procedure/ddl/createtable/create_table_test.go similarity index 100% rename from server/coordinator/procedure/dml/createtable/create_table_test.go rename to server/coordinator/procedure/ddl/createtable/create_table_test.go diff --git a/server/coordinator/procedure/dml/droppartitiontable/create_drop_partition_table_test.go b/server/coordinator/procedure/ddl/droppartitiontable/create_drop_partition_table_test.go similarity index 100% rename from server/coordinator/procedure/dml/droppartitiontable/create_drop_partition_table_test.go rename to server/coordinator/procedure/ddl/droppartitiontable/create_drop_partition_table_test.go diff --git a/server/coordinator/procedure/dml/droppartitiontable/drop_partition_table.go b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go similarity index 53% rename from server/coordinator/procedure/dml/droppartitiontable/drop_partition_table.go rename to server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go index 8e157dc7..eeed98a6 100644 --- a/server/coordinator/procedure/dml/droppartitiontable/drop_partition_table.go +++ b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go @@ -10,7 +10,7 @@ import ( "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/pkg/log" - "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" @@ -20,84 +20,66 @@ import ( ) // fsm state change: -// ┌────────┐ ┌────────────────┐ ┌────────────────────┐ ┌──────────────────────┐ ┌───────────┐ -// │ Begin ├─────▶ DropDataTable ├─────▶ DropPartitionTable ├─────▶ ClosePartitionTables ├─────▶ Finish │ -// └────────┘ └────────────────┘ └────────────────────┘ └──────────────────────┘ └───────────┘ +// ┌────────┐ ┌────────────────┐ ┌────────────────────┐ ┌───────────┐ +// │ Begin ├─────▶ DropDataTable ├─────▶ DropPartitionTable ├──────▶ Finish │ +// └────────┘ └────────────────┘ └────────────────────┘ └───────────┘ const ( - eventDropDataTable = "EventDropDataTable" - eventDropPartitionTable = "EventDropPartitionTable" - eventClosePartitionTables = "EventClosePartitionTables" - eventFinish = "EventFinish" - - stateBegin = "StateBegin" - stateDropDataTable = "StateDropDataTable" - stateDropPartitionTable = "StateDropPartitionTable" - stateClosePartitionTables = "StateClosePartitionTables" - stateFinish = "StateFinish" + eventDropDataTable = "EventDropDataTable" + eventDropPartitionTable = "EventDropPartitionTable" + eventFinish = "EventFinish" + + stateBegin = "StateBegin" + stateDropDataTable = "StateDropDataTable" + stateDropPartitionTable = "StateDropPartitionTable" + stateFinish = "StateFinish" ) var ( createDropPartitionTableEvents = fsm.Events{ {Name: eventDropDataTable, Src: []string{stateBegin}, Dst: stateDropDataTable}, {Name: eventDropPartitionTable, Src: []string{stateDropDataTable}, Dst: stateDropPartitionTable}, - {Name: eventClosePartitionTables, Src: []string{stateDropPartitionTable}, Dst: stateClosePartitionTables}, - {Name: eventFinish, Src: []string{stateClosePartitionTables}, Dst: stateFinish}, + {Name: eventFinish, Src: []string{stateDropPartitionTable}, Dst: stateFinish}, } createDropPartitionTableCallbacks = fsm.Callbacks{ - eventDropDataTable: dropDataTablesCallback, - eventDropPartitionTable: dropPartitionTableCallback, - eventClosePartitionTables: closePartitionTableCallback, - eventFinish: finishCallback, + eventDropDataTable: dropDataTablesCallback, + eventDropPartitionTable: dropPartitionTableCallback, + eventFinish: finishCallback, } ) type Procedure struct { - id uint64 - fsm *fsm.FSM - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch - storage procedure.Storage - - onSucceeded func(cluster.TableInfo) error - onFailed func(error) error - - req *metaservicepb.DropTableRequest + fsm *fsm.FSM + params ProcedureParams // Protect the state. lock sync.RWMutex state procedure.State } -type ProcedureRequest struct { - ID uint64 - Cluster *cluster.Cluster - Dispatch eventdispatch.Dispatch - Storage procedure.Storage - Request *metaservicepb.DropTableRequest - OnSucceeded func(result cluster.TableInfo) error - OnFailed func(error) error +type ProcedureParams struct { + ID uint64 + ClusterMetadata *metadata.ClusterMetadata + Dispatch eventdispatch.Dispatch + Storage procedure.Storage + SourceReq *metaservicepb.DropTableRequest + OnSucceeded func(result metadata.TableInfo) error + OnFailed func(error) error } -func NewProcedure(req ProcedureRequest) *Procedure { +func NewProcedure(params ProcedureParams) *Procedure { fsm := fsm.NewFSM( stateBegin, createDropPartitionTableEvents, createDropPartitionTableCallbacks, ) return &Procedure{ - id: req.ID, - fsm: fsm, - cluster: req.Cluster, - dispatch: req.Dispatch, - storage: req.Storage, - req: req.Request, - onSucceeded: req.OnSucceeded, - onFailed: req.OnFailed, + fsm: fsm, + params: params, } } func (p *Procedure) ID() uint64 { - return p.id + return p.params.ID } func (p *Procedure) Typ() procedure.Typ { @@ -108,12 +90,7 @@ func (p *Procedure) Start(ctx context.Context) error { p.updateStateWithLock(procedure.StateRunning) dropPartitionTableRequest := &callbackRequest{ - ctx: ctx, - cluster: p.cluster, - dispatch: p.dispatch, - req: p.req, - onSucceeded: p.onSucceeded, - onFailed: p.onFailed, + ctx: ctx, } for { @@ -135,20 +112,12 @@ func (p *Procedure) Start(ctx context.Context) error { return errors.WithMessage(err, "drop partition table procedure drop data table") } case stateDropPartitionTable: - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "drop partition table procedure persist") - } - if err := p.fsm.Event(eventClosePartitionTables, dropPartitionTableRequest); err != nil { - p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "drop partition table procedure drop partition table") - } - case stateClosePartitionTables: if err := p.persist(ctx); err != nil { return errors.WithMessage(err, "drop partition table procedure persist") } if err := p.fsm.Event(eventFinish, dropPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "drop partition table procedure close partition tables") + return errors.WithMessage(err, "drop partition table procedure drop partition table") } case stateFinish: p.updateStateWithLock(procedure.StateFinished) @@ -184,7 +153,7 @@ func (p *Procedure) persist(ctx context.Context) error { if err != nil { return errors.WithMessage(err, "convert to meta") } - err = p.storage.CreateOrUpdate(ctx, meta) + err = p.params.Storage.CreateOrUpdate(ctx, meta) if err != nil { return errors.WithMessage(err, "createOrUpdate procedure storage") } @@ -196,18 +165,18 @@ func (p *Procedure) convertToMeta() (procedure.Meta, error) { defer p.lock.RUnlock() rawData := rawData{ - ID: p.id, + ID: p.params.ID, FsmState: p.fsm.Current(), State: p.state, - DropTableRequest: p.req, + DropTableRequest: p.params.SourceReq, } rawDataBytes, err := json.Marshal(rawData) if err != nil { - return procedure.Meta{}, procedure.ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%d, err:%v", p.id, err) + return procedure.Meta{}, procedure.ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%d, err:%v", p.params.ID, err) } meta := procedure.Meta{ - ID: p.id, + ID: p.params.ID, Typ: procedure.DropPartitionTable, State: p.state, @@ -226,24 +195,18 @@ type rawData struct { } type callbackRequest struct { - ctx context.Context - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch + ctx context.Context + p *Procedure - onSucceeded func(info cluster.TableInfo) error - onFailed func(error) error - - req *metaservicepb.DropTableRequest - versions []cluster.ShardVersionUpdate - table storage.Table + table storage.Table } func (d *callbackRequest) schemaName() string { - return d.req.GetSchemaName() + return d.p.params.SourceReq.GetSchemaName() } func (d *callbackRequest) tableName() string { - return d.req.GetName() + return d.p.params.SourceReq.GetName() } // 1. Drop data tables in target nodes. @@ -253,29 +216,30 @@ func dropDataTablesCallback(event *fsm.Event) { procedure.CancelEventWithLog(event, err, "get request from event") return } + params := req.p.params - if len(req.req.PartitionTableInfo.SubTableNames) == 0 { - procedure.CancelEventWithLog(event, procedure.ErrEmptyPartitionNames, fmt.Sprintf("drop table, table:%s", req.req.Name)) + if len(params.SourceReq.PartitionTableInfo.SubTableNames) == 0 { + procedure.CancelEventWithLog(event, procedure.ErrEmptyPartitionNames, fmt.Sprintf("drop table, table:%s", params.SourceReq.Name)) return } - for _, tableName := range req.req.PartitionTableInfo.SubTableNames { - table, dropTableResult, exists, err := dropTableMetaData(event, tableName) + for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames { + dropTableResult, err := dropTableMetaData(event, tableName) if err != nil { procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName)) return } - if !exists { + if !dropTableResult.exists { continue } - if len(dropTableResult.ShardVersionUpdate) != 1 { - procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableResult.ShardVersionUpdate))) + if len(dropTableResult.result.ShardVersionUpdate) != 1 { + procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult!=1, current is %d", len(dropTableResult.result.ShardVersionUpdate))) return } - if err := dispatchDropTable(event, table, dropTableResult.ShardVersionUpdate[0]); err != nil { + if err := dispatchDropTable(event, dropTableResult.table, dropTableResult.result.ShardVersionUpdate[0]); err != nil { procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName)) return } @@ -290,67 +254,28 @@ func dropPartitionTableCallback(event *fsm.Event) { return } - table, dropTableRet, exists, err := dropTableMetaData(event, req.tableName()) + dropTableResult, err := dropTableMetaData(event, req.tableName()) if err != nil { procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", req.tableName())) return } - if !exists { + if !dropTableResult.exists { procedure.CancelEventWithLog(event, procedure.ErrTableNotExists, fmt.Sprintf("table:%s", req.tableName())) return } - if len(dropTableRet.ShardVersionUpdate) == 0 { - procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult need >=1, current is %d", len(dropTableRet.ShardVersionUpdate))) + if len(dropTableResult.result.ShardVersionUpdate) == 0 { + procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult need >=1, current is %d", len(dropTableResult.result.ShardVersionUpdate))) return } - req.versions = dropTableRet.ShardVersionUpdate - req.table = table + req.table = dropTableResult.table // Drop table in the first shard. - if err := dispatchDropTable(event, table, dropTableRet.ShardVersionUpdate[0]); err != nil { - procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", table.Name)) - return - } -} - -// 3. Close partition table in target node. -func closePartitionTableCallback(event *fsm.Event) { - request, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") + if err := dispatchDropTable(event, dropTableResult.table, dropTableResult.result.ShardVersionUpdate[0]); err != nil { + procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", dropTableResult.table.Name)) return } - - tableInfo := cluster.TableInfo{ - ID: request.table.ID, - Name: request.table.Name, - SchemaID: request.table.SchemaID, - SchemaName: request.req.GetSchemaName(), - } - - for _, version := range request.versions[1:] { - shardNodes, err := request.cluster.GetShardNodesByShardID(version.ShardID) - if err != nil { - procedure.CancelEventWithLog(event, err, "get shard nodes by shard id") - return - } - // Close partition table shard. - for _, shardNode := range shardNodes { - if err = request.dispatch.CloseTableOnShard(request.ctx, shardNode.NodeName, eventdispatch.CloseTableOnShardRequest{ - UpdateShardInfo: eventdispatch.UpdateShardInfo{CurrShardInfo: cluster.ShardInfo{ - ID: shardNode.ID, - Role: shardNode.ShardRole, - Version: version.CurrVersion, - }, PrevVersion: version.PrevVersion}, - TableInfo: tableInfo, - }); err != nil { - procedure.CancelEventWithLog(event, err, "close shard") - return - } - } - } } func finishCallback(event *fsm.Event) { @@ -361,64 +286,81 @@ func finishCallback(event *fsm.Event) { } log.Info("drop partition table finish") - tableInfo := cluster.TableInfo{ + tableInfo := metadata.TableInfo{ ID: request.table.ID, Name: request.table.Name, SchemaID: request.table.SchemaID, - SchemaName: request.req.GetSchemaName(), + SchemaName: request.p.params.SourceReq.GetSchemaName(), } - if err = request.onSucceeded(tableInfo); err != nil { + if err = request.p.params.OnSucceeded(tableInfo); err != nil { procedure.CancelEventWithLog(event, err, "drop partition table on succeeded") return } } -func dropTableMetaData(event *fsm.Event, tableName string) (storage.Table, cluster.DropTableResult, bool, error) { +type DropTableMetaDataResult struct { + table storage.Table + result metadata.DropTableResult + exists bool +} + +func dropTableMetaData(event *fsm.Event, tableName string) (DropTableMetaDataResult, error) { request, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { - return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "get request from event") + return DropTableMetaDataResult{ + table: storage.Table{}, + result: metadata.DropTableResult{}, + exists: false, + }, errors.WithMessage(err, "get request from event") } - table, exists, err := request.cluster.GetTable(request.schemaName(), tableName) + table, exists, err := request.p.params.ClusterMetadata.GetTable(request.schemaName(), tableName) if err != nil { - return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "cluster get table") + return DropTableMetaDataResult{ + table: storage.Table{}, + result: metadata.DropTableResult{}, + exists: false, + }, errors.WithMessage(err, "cluster get table") } if !exists { log.Warn("drop non-existing table", zap.String("schema", request.schemaName()), zap.String("table", tableName)) - return storage.Table{}, cluster.DropTableResult{}, false, nil + return DropTableMetaDataResult{storage.Table{}, metadata.DropTableResult{}, false}, nil } - result, err := request.cluster.DropTable(request.ctx, request.schemaName(), tableName) + result, err := request.p.params.ClusterMetadata.DropTable(request.ctx, request.schemaName(), tableName) if err != nil { - return storage.Table{}, cluster.DropTableResult{}, false, errors.WithMessage(err, "cluster drop table") + return DropTableMetaDataResult{storage.Table{}, metadata.DropTableResult{}, false}, errors.WithMessage(err, "cluster drop table") } - - return table, result, true, nil + return DropTableMetaDataResult{ + table: table, + result: result, + exists: true, + }, nil } -func dispatchDropTable(event *fsm.Event, table storage.Table, version cluster.ShardVersionUpdate) error { +func dispatchDropTable(event *fsm.Event, table storage.Table, version metadata.ShardVersionUpdate) error { request, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { return errors.WithMessage(err, "get request from event") } - shardNodes, err := request.cluster.GetShardNodesByShardID(version.ShardID) + shardNodes, err := request.p.params.ClusterMetadata.GetShardNodesByShardID(version.ShardID) if err != nil { procedure.CancelEventWithLog(event, err, "get shard nodes by shard id") return errors.WithMessage(err, "cluster get shard by shard id") } - tableInfo := cluster.TableInfo{ + tableInfo := metadata.TableInfo{ ID: table.ID, Name: table.Name, SchemaID: table.SchemaID, - SchemaName: request.req.GetSchemaName(), + SchemaName: request.p.params.SourceReq.GetSchemaName(), } for _, shardNode := range shardNodes { - err = request.dispatch.DropTableOnShard(request.ctx, shardNode.NodeName, eventdispatch.DropTableOnShardRequest{ + err = request.p.params.Dispatch.DropTableOnShard(request.ctx, shardNode.NodeName, eventdispatch.DropTableOnShardRequest{ UpdateShardInfo: eventdispatch.UpdateShardInfo{ - CurrShardInfo: cluster.ShardInfo{ + CurrShardInfo: metadata.ShardInfo{ ID: version.ShardID, Role: storage.ShardRoleLeader, Version: version.CurrVersion, diff --git a/server/coordinator/procedure/dml/droptable/create_drop_table_test.go b/server/coordinator/procedure/ddl/droptable/create_drop_table_test.go similarity index 100% rename from server/coordinator/procedure/dml/droptable/create_drop_table_test.go rename to server/coordinator/procedure/ddl/droptable/create_drop_table_test.go diff --git a/server/coordinator/procedure/dml/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go similarity index 56% rename from server/coordinator/procedure/dml/droptable/drop_table.go rename to server/coordinator/procedure/ddl/droptable/drop_table.go index 265dbf3e..3ca0508b 100644 --- a/server/coordinator/procedure/dml/droptable/drop_table.go +++ b/server/coordinator/procedure/ddl/droptable/drop_table.go @@ -9,7 +9,7 @@ import ( "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/pkg/log" - "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" "github.com/CeresDB/ceresmeta/server/coordinator/procedure" "github.com/CeresDB/ceresmeta/server/storage" @@ -48,24 +48,25 @@ func prepareCallback(event *fsm.Event) { procedure.CancelEventWithLog(event, err, "get request from event") return } + params := req.p.params - table, exists, err := req.cluster.GetTable(req.rawReq.GetSchemaName(), req.rawReq.GetName()) + table, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) if err != nil { procedure.CancelEventWithLog(event, err, "cluster get table") return } if !exists { - log.Warn("drop non-existing table", zap.String("schema", req.rawReq.GetSchemaName()), zap.String("table", req.rawReq.GetName())) + log.Warn("drop non-existing table", zap.String("schema", params.SourceReq.GetSchemaName()), zap.String("table", params.SourceReq.GetName())) return } - shardNodesResult, err := req.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID}) + shardNodesResult, err := params.ClusterMetadata.GetShardNodeByTableIDs([]storage.TableID{table.ID}) if err != nil { procedure.CancelEventWithLog(event, err, "cluster get shard by table id") return } - result, err := req.cluster.DropTable(req.ctx, req.rawReq.GetSchemaName(), req.rawReq.GetName()) + result, err := params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) if err != nil { procedure.CancelEventWithLog(event, err, "cluster drop table") return @@ -98,15 +99,15 @@ func prepareCallback(event *fsm.Event) { return } - tableInfo := cluster.TableInfo{ + tableInfo := metadata.TableInfo{ ID: table.ID, Name: table.Name, SchemaID: table.SchemaID, - SchemaName: req.rawReq.GetSchemaName(), + SchemaName: params.SourceReq.GetSchemaName(), } - err = req.dispatch.DropTableOnShard(req.ctx, leader.NodeName, eventdispatch.DropTableOnShardRequest{ + err = params.Dispatch.DropTableOnShard(req.ctx, leader.NodeName, eventdispatch.DropTableOnShardRequest{ UpdateShardInfo: eventdispatch.UpdateShardInfo{ - CurrShardInfo: cluster.ShardInfo{ + CurrShardInfo: metadata.ShardInfo{ ID: result.ShardVersionUpdate[0].ShardID, Role: storage.ShardRoleLeader, Version: result.ShardVersionUpdate[0].CurrVersion, @@ -126,7 +127,7 @@ func prepareCallback(event *fsm.Event) { func successCallback(event *fsm.Event) { req := event.Args[0].(*callbackRequest) - if err := req.onSucceeded(req.ret); err != nil { + if err := req.p.params.OnSucceeded(req.ret); err != nil { log.Error("exec success callback failed") } } @@ -134,59 +135,109 @@ func successCallback(event *fsm.Event) { func failedCallback(event *fsm.Event) { req := event.Args[0].(*callbackRequest) - if err := req.onFailed(event.Err); err != nil { + if err := req.p.params.OnFailed(event.Err); err != nil { log.Error("exec failed callback failed") } } // callbackRequest is fsm callbacks param. type callbackRequest struct { - ctx context.Context - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch + ctx context.Context + p *Procedure - rawReq *metaservicepb.DropTableRequest + ret metadata.TableInfo +} - onSucceeded func(cluster.TableInfo) error - onFailed func(error) error +type ProcedureParams struct { + ID uint64 + Dispatch eventdispatch.Dispatch + ClusterMetadata *metadata.ClusterMetadata + ClusterSnapshot metadata.Snapshot - ret cluster.TableInfo + SourceReq *metaservicepb.DropTableRequest + OnSucceeded func(metadata.TableInfo) error + OnFailed func(error) error } -func NewDropTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.DropTableRequest, onSucceeded func(cluster.TableInfo) error, onFailed func(error) error) procedure.Procedure { +func NewDropTableProcedure(params ProcedureParams) (procedure.Procedure, error) { + shardID, err := validateTable(params) + if err != nil { + return nil, err + } + + relatedVersionInfo := buildRelatedVersionInfo(params, shardID) + fsm := fsm.NewFSM( stateBegin, dropTableEvents, dropTableCallbacks, ) + return &Procedure{ - id: id, - fsm: fsm, - cluster: cluster, - dispatch: dispatch, - req: req, - onSucceeded: onSucceeded, - onFailed: onFailed, - state: procedure.StateInit, + fsm: fsm, + shardID: shardID, + relatedVersionInfo: relatedVersionInfo, + params: params, + state: procedure.StateInit, + }, nil +} + +func buildRelatedVersionInfo(params ProcedureParams, shardID storage.ShardID) procedure.RelatedVersionInfo { + shardWithVersion := make(map[storage.ShardID]uint64, 1) + for _, shardView := range params.ClusterSnapshot.Topology.ShardViews { + if shardView.ShardID == shardID { + shardWithVersion[shardID] = shardView.Version + } + } + return procedure.RelatedVersionInfo{ + ClusterID: params.ClusterSnapshot.Topology.ClusterView.ClusterID, + ShardWithVersion: shardWithVersion, + ClusterVersion: params.ClusterSnapshot.Topology.ClusterView.Version, } } -type Procedure struct { - id uint64 - fsm *fsm.FSM - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch - req *metaservicepb.DropTableRequest +func validateTable(params ProcedureParams) (storage.ShardID, error) { + table, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + log.Error("get table", zap.Error(err)) + return 0, err + } + if !exists { + log.Error("drop non-existing table", zap.String("schema", params.SourceReq.GetSchemaName()), zap.String("table", params.SourceReq.GetName())) + return 0, err + } + + for _, shardView := range params.ClusterSnapshot.Topology.ShardViews { + for _, tableID := range shardView.TableIDs { + if table.ID == tableID { + return shardView.ShardID, nil + } + } + } + + return 0, errors.WithMessagef(metadata.ErrShardNotFound, "The shard corresponding to the table was not found, schema:%s, table:%s", params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) +} - onSucceeded func(cluster.TableInfo) error - onFailed func(error) error +type Procedure struct { + fsm *fsm.FSM + shardID storage.ShardID + relatedVersionInfo procedure.RelatedVersionInfo + params ProcedureParams lock sync.RWMutex state procedure.State } +func (p *Procedure) RelatedVersionInfo() procedure.RelatedVersionInfo { + return p.relatedVersionInfo +} + +func (p *Procedure) Priority() procedure.Priority { + return procedure.PriorityLow +} + func (p *Procedure) ID() uint64 { - return p.id + return p.params.ID } func (p *Procedure) Typ() procedure.Typ { @@ -197,12 +248,8 @@ func (p *Procedure) Start(ctx context.Context) error { p.updateState(procedure.StateRunning) req := &callbackRequest{ - cluster: p.cluster, - ctx: ctx, - dispatch: p.dispatch, - rawReq: p.req, - onSucceeded: p.onSucceeded, - onFailed: p.onFailed, + ctx: ctx, + p: p, } if err := p.fsm.Event(eventPrepare, req); err != nil { diff --git a/server/coordinator/procedure/dml/createpartitiontable/create_partition_table.go b/server/coordinator/procedure/dml/createpartitiontable/create_partition_table.go deleted file mode 100644 index 60d1ecb7..00000000 --- a/server/coordinator/procedure/dml/createpartitiontable/create_partition_table.go +++ /dev/null @@ -1,407 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -package createpartitiontable - -import ( - "context" - "encoding/json" - "sync" - - "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" - "github.com/CeresDB/ceresmeta/pkg/log" - "github.com/CeresDB/ceresmeta/server/cluster" - "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" - "github.com/CeresDB/ceresmeta/server/coordinator/procedure" - "github.com/CeresDB/ceresmeta/server/storage" - "github.com/looplab/fsm" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -// fsm state change: -// ┌────────┐ ┌──────────────────────┐ ┌────────────────────┐ ┌────────────────────┐ ┌───────────┐ -// │ Begin ├─────▶ CreatePartitionTable ├─────▶ CreateDataTables ├─────▶OpenPartitionTables ├─────▶ Finish │ -// └────────┘ └──────────────────────┘ └────────────────────┘ └────────────────────┘ └───────────┘ -const ( - eventCreatePartitionTable = "EventCreatePartitionTable" - eventCreateSubTables = "EventCreateSubTables" - eventUpdateTableShardMetadata = "EventUpdateTableShardMetadata" - eventOpenPartitionTables = "EventOpenPartitionTables" - eventFinish = "EventFinish" - - stateBegin = "StateBegin" - stateCreatePartitionTable = "StateCreatePartitionTable" - stateCreateSubTables = "StateCreateSubTables" - stateUpdateTableShardMetadata = "StateUpdateTableShardMetadata" - stateOpenPartitionTables = "StateOpenPartitionTables" - stateFinish = "StateFinish" -) - -var ( - createPartitionTableEvents = fsm.Events{ - {Name: eventCreatePartitionTable, Src: []string{stateBegin}, Dst: stateCreatePartitionTable}, - {Name: eventCreateSubTables, Src: []string{stateCreatePartitionTable}, Dst: stateCreateSubTables}, - {Name: eventUpdateTableShardMetadata, Src: []string{stateCreateSubTables}, Dst: stateUpdateTableShardMetadata}, - {Name: eventOpenPartitionTables, Src: []string{stateUpdateTableShardMetadata}, Dst: stateOpenPartitionTables}, - {Name: eventFinish, Src: []string{stateOpenPartitionTables}, Dst: stateFinish}, - } - createPartitionTableCallbacks = fsm.Callbacks{ - eventCreatePartitionTable: createPartitionTableCallback, - eventCreateSubTables: createDataTablesCallback, - eventUpdateTableShardMetadata: openPartitionTableMetadataCallback, - eventOpenPartitionTables: openPartitionTableCallback, - eventFinish: finishCallback, - } -) - -type Procedure struct { - id uint64 - fsm *fsm.FSM - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch - storage procedure.Storage - - req *metaservicepb.CreateTableRequest - - partitionTableShards []cluster.ShardNodeWithVersion - subTablesShards []cluster.ShardNodeWithVersion - - onSucceeded func(cluster.CreateTableResult) error - onFailed func(error) error - - lock sync.RWMutex - state procedure.State -} - -type ProcedureRequest struct { - ID uint64 - Cluster *cluster.Cluster - Dispatch eventdispatch.Dispatch - Storage procedure.Storage - Req *metaservicepb.CreateTableRequest - PartitionTableShards []cluster.ShardNodeWithVersion - SubTablesShards []cluster.ShardNodeWithVersion - OnSucceeded func(cluster.CreateTableResult) error - OnFailed func(error) error -} - -func NewProcedure(req ProcedureRequest) *Procedure { - fsm := fsm.NewFSM( - stateBegin, - createPartitionTableEvents, - createPartitionTableCallbacks, - ) - return &Procedure{ - id: req.ID, - fsm: fsm, - cluster: req.Cluster, - dispatch: req.Dispatch, - storage: req.Storage, - req: req.Req, - partitionTableShards: req.PartitionTableShards, - subTablesShards: req.SubTablesShards, - onSucceeded: req.OnSucceeded, - onFailed: req.OnFailed, - } -} - -func (p *Procedure) ID() uint64 { - return p.id -} - -func (p *Procedure) Typ() procedure.Typ { - return procedure.CreatePartitionTable -} - -func (p *Procedure) Start(ctx context.Context) error { - p.updateStateWithLock(procedure.StateRunning) - - createPartitionTableRequest := &callbackRequest{ - ctx: ctx, - cluster: p.cluster, - dispatch: p.dispatch, - sourceReq: p.req, - partitionTableShards: p.partitionTableShards, - subTablesShards: p.subTablesShards, - onSucceeded: p.onSucceeded, - onFailed: p.onFailed, - } - - for { - switch p.fsm.Current() { - case stateBegin: - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "persist create partition table procedure") - } - if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil { - p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "create partition table") - } - case stateCreatePartitionTable: - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "persist create partition table procedure") - } - if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil { - p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "create data tables") - } - case stateCreateSubTables: - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "persist create partition table procedure") - } - if err := p.fsm.Event(eventUpdateTableShardMetadata, createPartitionTableRequest); err != nil { - p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "update table shard metadata") - } - case stateUpdateTableShardMetadata: - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "persist create partition table procedure") - } - if err := p.fsm.Event(eventOpenPartitionTables, createPartitionTableRequest); err != nil { - p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "open partition tables") - } - case stateOpenPartitionTables: - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "persist create partition table procedure") - } - if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil { - p.updateStateWithLock(procedure.StateFailed) - return errors.WithMessage(err, "finish") - } - case stateFinish: - // TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine. - p.updateStateWithLock(procedure.StateFinished) - if err := p.persist(ctx); err != nil { - return errors.WithMessage(err, "create partition table procedure persist") - } - return nil - } - } -} - -func (p *Procedure) Cancel(_ context.Context) error { - p.updateStateWithLock(procedure.StateCancelled) - return nil -} - -func (p *Procedure) State() procedure.State { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.state -} - -type callbackRequest struct { - ctx context.Context - cluster *cluster.Cluster - dispatch eventdispatch.Dispatch - - sourceReq *metaservicepb.CreateTableRequest - - onSucceeded func(cluster.CreateTableResult) error - onFailed func(error) error - - createTableResult cluster.CreateTableResult - partitionTableShards []cluster.ShardNodeWithVersion - subTablesShards []cluster.ShardNodeWithVersion - versions []cluster.ShardVersionUpdate -} - -// 1. Create partition table in target node. -func createPartitionTableCallback(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - - // Select first shard to create partition table. - partTableShardNode := req.partitionTableShards[0] - - partInfo := req.sourceReq.GetPartitionTableInfo().GetPartitionInfo() - createTableResult, err := procedure.CreateTableMetadata(req.ctx, req.cluster, req.sourceReq.GetSchemaName(), req.sourceReq.GetName(), partTableShardNode.ShardInfo.ID, partInfo) - if err != nil { - procedure.CancelEventWithLog(event, err, "create table metadata") - return - } - req.createTableResult = createTableResult - - if err = procedure.CreateTableOnShard(req.ctx, req.cluster, req.dispatch, partTableShardNode.ShardInfo.ID, procedure.BuildCreateTableRequest(createTableResult, req.sourceReq, partInfo)); err != nil { - procedure.CancelEventWithLog(event, err, "dispatch create table on shard") - return - } -} - -// 2. Create data tables in target nodes. -func createDataTablesCallback(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - - for i, subTableShard := range req.subTablesShards { - createTableResult, err := procedure.CreateTableMetadata(req.ctx, req.cluster, req.sourceReq.GetSchemaName(), req.sourceReq.GetPartitionTableInfo().SubTableNames[i], subTableShard.ShardInfo.ID, nil) - if err != nil { - procedure.CancelEventWithLog(event, err, "create table metadata") - return - } - - if err = procedure.CreateTableOnShard(req.ctx, req.cluster, req.dispatch, subTableShard.ShardInfo.ID, procedure.BuildCreateTableRequest(createTableResult, req.sourceReq, nil)); err != nil { - procedure.CancelEventWithLog(event, err, "dispatch create table on shard") - return - } - } -} - -// 3. Update table shard mapping. -func openPartitionTableMetadataCallback(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - - req.partitionTableShards = append(req.partitionTableShards[:0], req.partitionTableShards[1:]...) - versions := make([]cluster.ShardVersionUpdate, 0, len(req.partitionTableShards)) - for _, partitionTableShard := range req.partitionTableShards { - shardVersionUpdate, err := req.cluster.OpenTable(req.ctx, - cluster.OpenTableRequest{ - SchemaName: req.sourceReq.SchemaName, - TableName: req.sourceReq.Name, - ShardID: partitionTableShard.ShardInfo.ID, - }) - if err != nil { - procedure.CancelEventWithLog(event, err, "open table") - return - } - versions = append(versions, shardVersionUpdate) - } - req.versions = versions -} - -// 4. Open table on target shard. -func openPartitionTableCallback(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - table, exists, err := req.cluster.GetTable(req.sourceReq.SchemaName, req.sourceReq.Name) - if err != nil { - log.Error("get table", zap.Error(err)) - procedure.CancelEventWithLog(event, err, "get table") - return - } - - if !exists { - procedure.CancelEventWithLog(event, err, "the table to be closed does not exist") - return - } - - for _, version := range req.versions { - shardNodes, err := req.cluster.GetShardNodesByShardID(version.ShardID) - if err != nil { - procedure.CancelEventWithLog(event, err, "get shard nodes by shard id") - return - } - - for _, shardNode := range shardNodes { - // Open partition table on target shard. - if err := req.dispatch.OpenTableOnShard(req.ctx, shardNode.NodeName, - eventdispatch.OpenTableOnShardRequest{ - UpdateShardInfo: eventdispatch.UpdateShardInfo{ - CurrShardInfo: cluster.ShardInfo{ - ID: shardNode.ID, - Role: shardNode.ShardRole, - Version: version.CurrVersion, - }, - PrevVersion: version.PrevVersion, - }, - TableInfo: cluster.TableInfo{ - ID: table.ID, - Name: table.Name, - SchemaID: table.SchemaID, - SchemaName: req.sourceReq.SchemaName, - PartitionInfo: storage.PartitionInfo{ - Info: req.sourceReq.GetPartitionTableInfo().GetPartitionInfo(), - }, - }, - }); err != nil { - procedure.CancelEventWithLog(event, err, "open table on shard") - return - } - } - } -} - -func finishCallback(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - log.Info("create partition table finish") - - if err := req.onSucceeded(req.createTableResult); err != nil { - procedure.CancelEventWithLog(event, err, "create partition table on succeeded") - return - } -} - -func (p *Procedure) updateStateWithLock(state procedure.State) { - p.lock.Lock() - defer p.lock.Unlock() - - p.state = state -} - -func (p *Procedure) persist(ctx context.Context) error { - meta, err := p.convertToMeta() - if err != nil { - return errors.WithMessage(err, "convert to meta") - } - err = p.storage.CreateOrUpdate(ctx, meta) - if err != nil { - return errors.WithMessage(err, "createOrUpdate procedure storage") - } - return nil -} - -type rawData struct { - ID uint64 - FsmState string - State procedure.State - - CreateTableResult cluster.CreateTableResult - PartitionTableShards []cluster.ShardNodeWithVersion - SubTablesShards []cluster.ShardNodeWithVersion -} - -func (p *Procedure) convertToMeta() (procedure.Meta, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - rawData := rawData{ - ID: p.id, - FsmState: p.fsm.Current(), - State: p.state, - PartitionTableShards: p.partitionTableShards, - SubTablesShards: p.subTablesShards, - } - rawDataBytes, err := json.Marshal(rawData) - if err != nil { - return procedure.Meta{}, procedure.ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.id, err) - } - - meta := procedure.Meta{ - ID: p.id, - Typ: procedure.CreatePartitionTable, - State: p.state, - - RawData: rawDataBytes, - } - - return meta, nil -} diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index d7dad33f..44c2f5bb 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -21,4 +21,5 @@ var ( ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit new procedure") ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data") ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure") + ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough") )