Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor ddl procedures (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang authored Apr 14, 2023
1 parent 93f830d commit 1dadcb1
Show file tree
Hide file tree
Showing 14 changed files with 593 additions and 672 deletions.
26 changes: 26 additions & 0 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName string) (storage.Table,
return c.tableManager.GetTable(schemaName, tableName)
}

func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request CreateTableMetadataRequest) (CreateTableMetadataResult, error) {
log.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

_, exists, err := c.tableManager.GetTable(request.SchemaName, request.TableName)
if err != nil {
return CreateTableMetadataResult{}, err
}

if exists {
return CreateTableMetadataResult{}, ErrTableAlreadyExists
}

// Create table in table manager.
table, err := c.tableManager.CreateTable(ctx, request.SchemaName, request.TableName, request.PartitionInfo)
if err != nil {
return CreateTableMetadataResult{}, errors.WithMessage(err, "table manager create table")
}

res := CreateTableMetadataResult{
Table: table,
}

log.Info("create table metadata succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", res)), zap.Object("result", res))
return res, nil
}

func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRequest) (CreateTableResult, error) {
log.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

Expand Down
10 changes: 10 additions & 0 deletions server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ type CreateClusterOpts struct {
ShardTotal uint32
}

type CreateTableMetadataRequest struct {
SchemaName string
TableName string
PartitionInfo storage.PartitionInfo
}

type CreateTableMetadataResult struct {
Table storage.Table
}

type CreateTableRequest struct {
ShardID storage.ShardID
SchemaName string
Expand Down
31 changes: 16 additions & 15 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type Factory struct {
}

type CreateTableRequest struct {
Cluster *metadata.ClusterMetadata
SourceReq *metaservicepb.CreateTableRequest
ClusterMetadata *metadata.ClusterMetadata
SourceReq *metaservicepb.CreateTableRequest

OnSucceeded func(metadata.CreateTableResult) error
OnFailed func(error) error
Expand All @@ -49,8 +49,8 @@ func (request *CreateTableRequest) isPartitionTable() bool {
}

type DropTableRequest struct {
Cluster *metadata.ClusterMetadata
SourceReq *metaservicepb.DropTableRequest
ClusterMetadata *metadata.ClusterMetadata
SourceReq *metaservicepb.DropTableRequest

OnSucceeded func(metadata.TableInfo) error
OnFailed func(error) error
Expand Down Expand Up @@ -102,7 +102,7 @@ func (f *Factory) MakeCreateTableProcedure(ctx context.Context, request CreateTa

if isPartitionTable {
return f.makeCreatePartitionTableProcedure(ctx, CreatePartitionTableRequest{
Cluster: request.Cluster,
Cluster: request.ClusterMetadata,
SourceReq: request.SourceReq,
PartitionTableRatioOfNodes: f.partitionTableProportionOfNodes,
OnSucceeded: request.OnSucceeded,
Expand All @@ -118,6 +118,8 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa
if err != nil {
return nil, err
}
snapshot := request.ClusterMetadata.GetClusterSnapshot()

shards, err := f.shardPicker.PickShards(ctx, request.Cluster.Name(), 1, false)
if err != nil {
log.Error("pick table shard", zap.Error(err))
Expand All @@ -129,15 +131,14 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa
}

procedure := createtable.NewProcedure(createtable.ProcedureRequest{
Dispatch: f.dispatch,
Cluster: request.Cluster,
ID: id,
ShardID: shards[0].ShardInfo.ID,
ShardVersion: shards[0].ShardInfo.Version,
ClusterVersion: request.Cluster.GetClusterViewVersion(),
Req: request.SourceReq,
OnSucceeded: request.OnSucceeded,
OnFailed: request.OnFailed,
Dispatch: f.dispatch,
ClusterMetadata: request.ClusterMetadata,
ClusterSnapshot: snapshot,
ID: id,
ShardID: shards[0].ShardInfo.ID,
Req: request.SourceReq,
OnSucceeded: request.OnSucceeded,
OnFailed: request.OnFailed,
})
return procedure, nil
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTabl
return procedure, nil
}

procedure := droptable.NewDropTableProcedure(f.dispatch, request.Cluster, id,
procedure := droptable.NewDropTableProcedure(f.dispatch, request.ClusterMetadata, id,
request.SourceReq, request.OnSucceeded, request.OnFailed)
return procedure, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

package procedure
package ddl

import (
"context"
Expand All @@ -9,6 +9,7 @@ import (
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
)
Expand All @@ -19,7 +20,7 @@ func CreateTableMetadata(ctx context.Context, c *metadata.ClusterMetadata, schem
return metadata.CreateTableResult{}, errors.WithMessage(err, "cluster get table")
}
if exists {
return metadata.CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "create an existing table, schemaName:%s, tableName:%s", schemaName, tableName)
return metadata.CreateTableResult{}, errors.WithMessagef(procedure.ErrTableAlreadyExists, "create an existing table, schemaName:%s, tableName:%s", schemaName, tableName)
}

createTableResult, err := c.CreateTable(ctx, metadata.CreateTableRequest{
Expand Down Expand Up @@ -50,7 +51,7 @@ func CreateTableOnShard(ctx context.Context, c *metadata.ClusterMetadata, dispat
}
}
if !found {
return errors.WithMessagef(ErrShardLeaderNotFound, "shard node can't find leader, shardID:%d", shardID)
return errors.WithMessagef(procedure.ErrShardLeaderNotFound, "shard node can't find leader, shardID:%d", shardID)
}

err = dispatch.CreateTableOnShard(ctx, leader.NodeName, request)
Expand Down
Loading

0 comments on commit 1dadcb1

Please sign in to comment.