From cec9fe04b8e361f141840e9b79041ec5c714c8a8 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Wed, 6 Dec 2023 16:29:10 +0800 Subject: [PATCH 01/13] fix: fix integration tests on dev branch (#287) ## Rationale Currently the tests on the dev branch are not running properly. ## Detailed Changes * Cherry pick some changes on main branch. * Fix integration test. ## Test Plan Pass CI. --- server/coordinator/procedure/storage_impl.go | 29 +++++++++----------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/server/coordinator/procedure/storage_impl.go b/server/coordinator/procedure/storage_impl.go index 3d60cd05..95a0c2d2 100644 --- a/server/coordinator/procedure/storage_impl.go +++ b/server/coordinator/procedure/storage_impl.go @@ -1,20 +1,17 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright 2022 The CeresDB Authors * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package procedure @@ -27,8 +24,8 @@ import ( "path" "strconv" - "github.com/apache/incubator-horaedb-meta/pkg/log" - "github.com/apache/incubator-horaedb-meta/server/etcdutil" + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/etcdutil" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/clientv3util" From 0cd9048b5b8ad0b9824b3aecd3e47db54b137864 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 11 Dec 2023 11:41:55 +0800 Subject: [PATCH 02/13] add assign table to storage --- server/cluster/metadata/cluster_metadata.go | 15 ++++- server/cluster/metadata/topology_manager.go | 73 ++++++++++++++++++++- server/etcdutil/util.go | 47 ++++++++++--- server/storage/key_path.go | 14 ++++ server/storage/meta.go | 7 ++ server/storage/storage_impl.go | 67 +++++++++++++++++++ server/storage/types.go | 27 ++++++++ 7 files changed, 237 insertions(+), 13 deletions(-) diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index ab9b293a..7c72e623 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -115,7 +115,8 @@ func (c *ClusterMetadata) Load(ctx context.Context) error { return errors.WithMessage(err, "load table manager") } - if err := c.topologyManager.Load(ctx); err != nil { + schemas := c.tableManager.GetSchemas() + if err := c.topologyManager.Load(ctx, schemas); err != nil { return errors.WithMessage(err, "load topology manager") } @@ -392,6 +393,18 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe return ret, nil } +func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { + return c.topologyManager.GetAssignTableResult(ctx, schemaID, tableName) +} + +func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { + return c.topologyManager.AssignTable(ctx, schemaID, tableName, shardID) +} + +func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error { + return c.topologyManager.DeleteAssignTable(ctx, schemaID, tableName) +} + func (c *ClusterMetadata) GetShards() []storage.ShardID { return c.topologyManager.GetShards() } diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index 2426bb0b..f5f99935 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -33,7 +33,7 @@ import ( // TopologyManager manages the cluster topology, including the mapping relationship between shards, nodes, and tables. type TopologyManager interface { // Load load cluster topology from storage. - Load(ctx context.Context) error + Load(ctx context.Context, schemas []storage.Schema) error // GetVersion get cluster view version. GetVersion() uint64 // GetClusterState get cluster view state. @@ -44,6 +44,12 @@ type TopologyManager interface { AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error // RemoveTable remove table on target shards from cluster topology. RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error + // AssignTable persistent table shard mapping, it is used to store assign results and make the table creation idempotent. + AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error + // GetAssignTableResult get table assign result. + GetAssignTableResult(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) + // DeleteAssignTable delete table assign result. + DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error // GetShards get all shards in cluster topology. GetShards() []storage.ShardID // GetShardNodesByID get shardNodes with shardID. @@ -133,6 +139,8 @@ type TopologyManagerImpl struct { // ShardView in memory. shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> shardTopology tableShardMapping map[storage.TableID][]storage.ShardID // tableID -> ShardID + // Table assign result in memory. + tableAssignMapping map[storage.SchemaID]map[string]storage.ShardID // tableName -> shardID nodes map[string]storage.Node // NodeName in memory. } @@ -154,7 +162,7 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage storage.Storage, cluster } } -func (m *TopologyManagerImpl) Load(ctx context.Context) error { +func (m *TopologyManagerImpl) Load(ctx context.Context, schemas []storage.Schema) error { m.lock.Lock() defer m.lock.Unlock() @@ -169,6 +177,11 @@ func (m *TopologyManagerImpl) Load(ctx context.Context) error { if err := m.loadNodes(ctx); err != nil { return errors.WithMessage(err, "load nodes") } + + if err := m.loadAssignTable(ctx, schemas); err != nil { + return errors.WithMessage(err, "load assign table") + } + return nil } @@ -294,6 +307,48 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S return nil } +func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { + m.lock.RLock() + defer m.lock.RUnlock() + + if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{ + ClusterID: m.clusterID, + SchemaID: schemaID, + TableName: tableName, + ShardID: shardID, + }); err != nil { + return errors.WithMessage(err, "storage assign table") + } + + // Update cache im memory. + m.tableAssignMapping[schemaID][tableName] = shardID + + return nil +} + +func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { + assignResult, exists := m.tableAssignMapping[schemaID][tableName] + return assignResult, exists +} + +func (m *TopologyManagerImpl) DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error { + m.lock.RLock() + defer m.lock.RUnlock() + + if err := m.storage.DeleteAssignTable(ctx, storage.DeleteAssignTableRequest{ + ClusterID: m.clusterID, + SchemaID: schemaID, + TableName: tableName, + }); err != nil { + return errors.WithMessage(err, "storage delete assign table") + } + + // Update cache im memory. + delete(m.tableAssignMapping[schemaID], tableName) + + return nil +} + func (m *TopologyManagerImpl) GetShards() []storage.ShardID { m.lock.RLock() defer m.lock.RUnlock() @@ -584,6 +639,20 @@ func (m *TopologyManagerImpl) loadShardViews(ctx context.Context) error { return nil } +func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas []storage.Schema) error { + for _, schema := range schemas { + listAssignTableResult, err := m.storage.ListAssignTable(ctx, storage.ListAssignTableRequest{ClusterID: m.clusterID, SchemaID: schema.ID}) + if err != nil { + return errors.WithMessage(err, "storage list assign table") + } + for _, assignTable := range listAssignTableResult.TableAssigns { + m.tableAssignMapping[schema.ID][assignTable.TableName] = assignTable.ShardID + } + } + + return nil +} + func (m *TopologyManagerImpl) loadNodes(ctx context.Context) error { nodesResult, err := m.storage.ListNodes(ctx, storage.ListNodesRequest{ClusterID: m.clusterID}) if err != nil { diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go index 5d29d805..6ec1f062 100644 --- a/server/etcdutil/util.go +++ b/server/etcdutil/util.go @@ -1,11 +1,9 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright 2022 The CeresDB Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -21,10 +19,10 @@ package etcdutil import ( "context" - - "github.com/apache/incubator-horaedb-meta/pkg/log" + "github.com/CeresDB/ceresmeta/pkg/log" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "path" ) func Get(ctx context.Context, client *clientv3.Client, key string) (string, error) { @@ -118,7 +116,7 @@ func Scan(ctx context.Context, client *clientv3.Client, startKey, endKey string, } } - // Check whether the keys is exhausted. + // Check whether the keys are exhausted. if len(resp.Kvs) < batchSize { return nil } @@ -126,3 +124,32 @@ func Scan(ctx context.Context, client *clientv3.Client, startKey, endKey string, lastKeyInPrevBatch = string(resp.Kvs[len(resp.Kvs)-1].Key) } } + +func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix string, batchSize int, do func(key string, val []byte) error) error { + rangeEnd := clientv3.GetPrefixRangeEnd(prefix) + + for { + resp, err := client.Get(ctx, prefix, clientv3.WithRange(rangeEnd), clientv3.WithLimit(int64(batchSize))) + if err != nil { + return ErrEtcdKVGet.WithCause(err) + } + // Check whether the keys are exhausted. + if len(resp.Kvs) == 0 { + return nil + } + + for _, item := range resp.Kvs { + err := do(string(item.Key), item.Value) + if err != nil { + return err + } + } + + rangeEnd = string(resp.Kvs[len(resp.Kvs)-1].Key) + } +} + +// GetLastPathSegment get +func GetLastPathSegment(completePath string) string { + return path.Base(path.Clean(completePath)) +} diff --git a/server/storage/key_path.go b/server/storage/key_path.go index 6036e59f..4627bc0b 100644 --- a/server/storage/key_path.go +++ b/server/storage/key_path.go @@ -36,6 +36,7 @@ const ( shardView = "shard_view" latestVersion = "latest_version" info = "info" + tableAssign = "table_assign" ) // makeSchemaKey returns the key path to the schema meta info. @@ -128,6 +129,19 @@ func makeNameToIDKey(rootPath string, clusterID uint32, schemaID uint32, tableNa return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), schema, fmtID(uint64(schemaID)), tableNameToID, tableName) } +// makeTableAssignKey return the tableAssign key path. +func makeTableAssignKey(rootPath string, clusterID uint32, schemaID uint32, tableName string) string { + // Example: + // v1/cluster/1/schema/1/table_assign/tableName1 -> shardID1 + // v1/cluster/1/schema/1/table_assign/tableName2 -> shardID2 + return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), schema, fmtID(uint64(schemaID)), tableAssign, tableName) +} + +// makeTableAssignPrefixKey return the tableAssign prefix key path. +func makeTableAssignPrefixKey(rootPath string, clusterID uint32, schemaID uint32) string { + return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), schema, fmtID(uint64(schemaID)), tableAssign) +} + func fmtID(id uint64) string { return fmt.Sprintf("%020d", id) } diff --git a/server/storage/meta.go b/server/storage/meta.go index 842995d5..563f601e 100644 --- a/server/storage/meta.go +++ b/server/storage/meta.go @@ -57,6 +57,13 @@ type Storage interface { // DeleteTable delete table by table name in specified cluster and schema. DeleteTable(ctx context.Context, req DeleteTableRequest) error + // AssignTable save table assign result. + AssignTable(ctx context.Context, req AssignTableRequest) error + // DeleteAssignTable delete table assign result. + DeleteAssignTable(ctx context.Context, req DeleteAssignTableRequest) error + // ListAssignTable list table assign result. + ListAssignTable(ctx context.Context, req ListAssignTableRequest) (ListAssignTableResult, error) + // CreateShardViews create shard views in specified cluster. CreateShardViews(ctx context.Context, req CreateShardViewsRequest) error // ListShardViews list all shard views in specified cluster. diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go index 8aaafdd8..60fb8080 100644 --- a/server/storage/storage_impl.go +++ b/server/storage/storage_impl.go @@ -418,6 +418,73 @@ func (s *metaStorageImpl) DeleteTable(ctx context.Context, req DeleteTableReques return nil } +func (s *metaStorageImpl) AssignTable(ctx context.Context, req AssignTableRequest) error { + key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID), req.TableName) + + // Check if the key exists, if not,save table assign result; Otherwise, the table assign result already exists and return an error. + keyMissing := clientv3util.KeyMissing(key) + opCreateAssignTable := clientv3.OpPut(key, strconv.Itoa(int(req.ShardID))) + + resp, err := s.client.Txn(ctx). + If(keyMissing). + Then(opCreateAssignTable). + Commit() + if err != nil { + return errors.WithMessagef(err, "create assign table, clusterID:%d, schemaID:%d, key:%s", req.ClusterID, req.ShardID, key) + } + if !resp.Succeeded { + return ErrCreateSchemaAgain.WithCausef("assign table may already exist, clusterID:%d, schemaID:%d, key:%s, resp:%v", req.ClusterID, req.SchemaID, key, resp) + } + + return nil +} + +func (s *metaStorageImpl) DeleteAssignTable(ctx context.Context, req DeleteAssignTableRequest) error { + key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID), req.TableName) + + keyExists := clientv3util.KeyExists(key) + opDeleteAssignTable := clientv3.OpDelete(key) + + resp, err := s.client.Txn(ctx). + If(keyExists). + Then(opDeleteAssignTable). + Commit() + if err != nil { + return errors.WithMessagef(err, "delete assign table, clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, req.SchemaID, req.TableName) + } + if !resp.Succeeded { + return ErrDeleteTableAgain.WithCausef("assign table may have been deleted, clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, req.SchemaID, req.TableName) + } + + return nil +} + +func (s *metaStorageImpl) ListAssignTable(ctx context.Context, req ListAssignTableRequest) (ListAssignTableResult, error) { + key := makeTableAssignPrefixKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID)) + rangeLimit := s.opts.MaxScanLimit + + var tableAssigns []TableAssign + do := func(key string, value []byte) error { + tableName := etcdutil.GetLastPathSegment(key) + shardIDStr := string(value) + shardID, err := strconv.ParseUint(shardIDStr, 10, 32) + if err != nil { + return err + } + tableAssigns = append(tableAssigns, TableAssign{ + TableName: tableName, + ShardID: ShardID(shardID), + }) + return nil + } + + if err := etcdutil.ScanWithPrefix(ctx, s.client, key, rangeLimit, do); err != nil { + return ListAssignTableResult{}, errors.WithMessagef(err, "scan tables, clusterID:%d, schemaID:%d, prefix key:%s, range limit:%d", req.ClusterID, req.SchemaID, key, rangeLimit) + } + + return ListAssignTableResult{TableAssigns: tableAssigns}, nil +} + func (s *metaStorageImpl) createNShardViews(ctx context.Context, clusterID ClusterID, shardViews []ShardView, ifConds []clientv3.Cmp, opCreates []clientv3.Op) error { for _, shardView := range shardViews { shardViewPB := convertShardViewToPB(shardView) diff --git a/server/storage/types.go b/server/storage/types.go index baff5e2e..62295499 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -141,6 +141,28 @@ type DeleteTableRequest struct { TableName string } +type AssignTableRequest struct { + ClusterID ClusterID + SchemaID SchemaID + TableName string + ShardID ShardID +} + +type DeleteAssignTableRequest struct { + ClusterID ClusterID + SchemaID SchemaID + TableName string +} + +type ListAssignTableRequest struct { + ClusterID ClusterID + SchemaID SchemaID +} + +type ListAssignTableResult struct { + TableAssigns []TableAssign +} + type CreateShardViewsRequest struct { ClusterID ClusterID ShardViews []ShardView @@ -232,6 +254,11 @@ func (t Table) IsPartitioned() bool { return t.PartitionInfo.Info != nil } +type TableAssign struct { + TableName string + ShardID ShardID +} + type ShardView struct { ShardID ShardID Version uint64 From 76973f53f56ef2dbb20a68e613af0c1eb3fe1f57 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Tue, 12 Dec 2023 11:45:20 +0800 Subject: [PATCH 03/13] pass shardID by factory --- server/cluster/metadata/cluster_metadata.go | 8 +++++-- server/coordinator/factory.go | 25 +++++++++++++-------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index 7c72e623..b8eb9bd8 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -393,8 +393,12 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe return ret, nil } -func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { - return c.topologyManager.GetAssignTableResult(ctx, schemaID, tableName) +func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool) { + schema, exists := c.tableManager.GetSchema(schemaName) + if !exists { + return 0, false + } + return c.topologyManager.GetAssignTableResult(ctx, schema.ID, tableName) } func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index a830fe3b..c6e45229 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -129,14 +129,21 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa } snapshot := request.ClusterMetadata.GetClusterSnapshot() - shards, err := f.shardPicker.PickShards(ctx, snapshot, 1) - if err != nil { - f.logger.Error("pick table shard", zap.Error(err)) - return nil, errors.WithMessage(err, "pick table shard") - } - if len(shards) != 1 { - f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards))) - return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards)) + var targetShardID storage.ShardID + shardID, exists := request.ClusterMetadata.GetAssignTable(ctx, request.SourceReq.SchemaName, request.SourceReq.Name) + if exists { + targetShardID = shardID + } else { + shards, err := f.shardPicker.PickShards(ctx, snapshot, 1) + if err != nil { + f.logger.Error("pick table shard", zap.Error(err)) + return nil, errors.WithMessage(err, "pick table shard") + } + if len(shards) != 1 { + f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards))) + return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards)) + } + targetShardID = shards[0].ID } return createtable.NewProcedure(createtable.ProcedureParams{ @@ -144,7 +151,7 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa ClusterMetadata: request.ClusterMetadata, ClusterSnapshot: snapshot, ID: id, - ShardID: shards[0].ID, + ShardID: targetShardID, SourceReq: request.SourceReq, OnSucceeded: request.OnSucceeded, OnFailed: request.OnFailed, From c7e24d5ad086bda1dbd80313210cb49af645d1b4 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 18 Dec 2023 16:21:50 +0800 Subject: [PATCH 04/13] make create table idempotent --- server/cluster/metadata/cluster_metadata.go | 28 +++-- server/cluster/metadata/topology_manager.go | 29 ++++- server/coordinator/factory.go | 5 +- .../procedure/ddl/createtable/create_table.go | 118 +++++++++++++++--- server/coordinator/procedure/error.go | 1 + 5 files changed, 156 insertions(+), 25 deletions(-) diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index b8eb9bd8..38e44785 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -280,6 +280,11 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName string) (storage.Table, return c.tableManager.GetTable(schemaName, tableName) } +// GetTableShard get the shard where the table actually exists. +func (c *ClusterMetadata) GetTableShard(ctx context.Context, table storage.Table) (storage.ShardID, bool) { + return c.topologyManager.GetTableShard(ctx, table) +} + func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request CreateTableMetadataRequest) (CreateTableMetadataResult, error) { c.logger.Info("create table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName)) @@ -393,20 +398,29 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe return ret, nil } -func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool) { +func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool, error) { schema, exists := c.tableManager.GetSchema(schemaName) if !exists { - return 0, false + return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) } - return c.topologyManager.GetAssignTableResult(ctx, schema.ID, tableName) + shardIDs, exists := c.topologyManager.GetAssignTableResult(ctx, schema.ID, tableName) + return shardIDs, exists, nil } -func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { - return c.topologyManager.AssignTable(ctx, schemaID, tableName, shardID) +func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string, tableName string, shardID storage.ShardID) error { + schema, exists := c.tableManager.GetSchema(schemaName) + if !exists { + return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) + } + return c.topologyManager.AssignTable(ctx, schema.ID, tableName, shardID) } -func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error { - return c.topologyManager.DeleteAssignTable(ctx, schemaID, tableName) +func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaName string, tableName string) error { + schema, exists := c.tableManager.GetSchema(schemaName) + if !exists { + return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) + } + return c.topologyManager.DeleteAssignTable(ctx, schema.ID, tableName) } func (c *ClusterMetadata) GetShards() []storage.ShardID { diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index f5f99935..7719047c 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -44,6 +44,8 @@ type TopologyManager interface { AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error // RemoveTable remove table on target shards from cluster topology. RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error + // GetTableShard get the shardID of the shard where the table is located. + GetTableShard(ctx context.Context, table storage.Table) (storage.ShardID, bool) // AssignTable persistent table shard mapping, it is used to store assign results and make the table creation idempotent. AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error // GetAssignTableResult get table assign result. @@ -158,6 +160,7 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage storage.Storage, cluster nodeShardsMapping: nil, shardTablesMapping: nil, tableShardMapping: nil, + tableAssignMapping: nil, nodes: nil, } } @@ -307,10 +310,22 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S return nil } -func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { +func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table storage.Table) (storage.ShardID, bool) { m.lock.RLock() defer m.lock.RUnlock() + shardIDs, exists := m.tableShardMapping[table.ID] + if exists { + return shardIDs[0], true + } + + return 0, false +} + +func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { + m.lock.Lock() + defer m.lock.Unlock() + if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{ ClusterID: m.clusterID, SchemaID: schemaID, @@ -321,6 +336,10 @@ func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID storage. } // Update cache im memory. + if _, exists := m.tableAssignMapping[schemaID]; !exists { + m.tableAssignMapping[schemaID] = make(map[string]storage.ShardID, 0) + } + m.tableAssignMapping[schemaID][tableName] = shardID return nil @@ -332,8 +351,8 @@ func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID s } func (m *TopologyManagerImpl) DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error { - m.lock.RLock() - defer m.lock.RUnlock() + m.lock.Lock() + defer m.lock.Unlock() if err := m.storage.DeleteAssignTable(ctx, storage.DeleteAssignTableRequest{ ClusterID: m.clusterID, @@ -640,11 +659,15 @@ func (m *TopologyManagerImpl) loadShardViews(ctx context.Context) error { } func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas []storage.Schema) error { + m.tableAssignMapping = make(map[storage.SchemaID]map[string]storage.ShardID, len(schemas)) for _, schema := range schemas { + m.tableAssignMapping[schema.ID] = make(map[string]storage.ShardID, 0) + listAssignTableResult, err := m.storage.ListAssignTable(ctx, storage.ListAssignTableRequest{ClusterID: m.clusterID, SchemaID: schema.ID}) if err != nil { return errors.WithMessage(err, "storage list assign table") } + //m.tableAssignMapping[schema.ID] = make(map[string]storage.ShardID, len(listAssignTableResult.TableAssigns)) for _, assignTable := range listAssignTableResult.TableAssigns { m.tableAssignMapping[schema.ID][assignTable.TableName] = assignTable.ShardID } diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index c6e45229..561f7581 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -130,7 +130,10 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa snapshot := request.ClusterMetadata.GetClusterSnapshot() var targetShardID storage.ShardID - shardID, exists := request.ClusterMetadata.GetAssignTable(ctx, request.SourceReq.SchemaName, request.SourceReq.Name) + shardID, exists, err := request.ClusterMetadata.GetAssignTable(ctx, request.SourceReq.SchemaName, request.SourceReq.Name) + if err != nil { + return nil, err + } if exists { targetShardID = shardID } else { diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index 49138a44..63dc3ad5 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -37,30 +37,38 @@ import ( ) const ( - eventCreateMetadata = "EventCreateMetadata" - eventCreateOnShard = "EventCreateOnShard" - eventFinish = "EventFinish" - - stateBegin = "StateBegin" - stateCreateMetadata = "StateCreateMetadata" - stateCreateOnShard = "StateCreateOnShard" - stateFinish = "StateFinish" + eventCheckTableExists = "EventCheckTableExists" + eventCreateTableAssign = "EventCreateTableAssign" + eventCreateMetadata = "EventCreateMetadata" + eventCreateOnShard = "EventCreateOnShard" + eventFinish = "EventFinish" + + stateBegin = "StateBegin" + stateCheckTableExists = "StateCheckTableExists" + stateCreateTableAssign = "StateCreateTableAssign" + stateCreateMetadata = "StateCreateMetadata" + stateCreateOnShard = "StateCreateOnShard" + stateFinish = "StateFinish" ) var ( createTableEvents = fsm.Events{ - {Name: eventCreateMetadata, Src: []string{stateBegin}, Dst: stateCreateMetadata}, + {Name: eventCheckTableExists, Src: []string{stateBegin}, Dst: stateCheckTableExists}, + {Name: eventCreateTableAssign, Src: []string{stateCheckTableExists}, Dst: stateCreateTableAssign}, + {Name: eventCreateMetadata, Src: []string{stateCreateTableAssign}, Dst: stateCreateMetadata}, {Name: eventCreateOnShard, Src: []string{stateCreateMetadata}, Dst: stateCreateOnShard}, {Name: eventFinish, Src: []string{stateCreateOnShard}, Dst: stateFinish}, } createTableCallbacks = fsm.Callbacks{ - eventCreateMetadata: createMetadataCallback, - eventCreateOnShard: createOnShard, - eventFinish: createFinish, + eventCheckTableExists: checkTableExists, + eventCreateTableAssign: createTableAssign, + eventCreateMetadata: createMetadata, + eventCreateOnShard: createOnShard, + eventFinish: createFinish, } ) -func createMetadataCallback(event *fsm.Event) { +func checkTableExists(event *fsm.Event) { req, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { procedure.CancelEventWithLog(event, err, "get request from event") @@ -68,6 +76,74 @@ func createMetadataCallback(event *fsm.Event) { } params := req.p.params + // Check whether the table metadata already exists. + table, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + procedure.CancelEventWithLog(event, err, "get table metadata") + return + } + if !exists { + return + } + + // Check whether the table shard mapping already exists. + _, exists = params.ClusterMetadata.GetTableShard(req.ctx, table) + if exists { + procedure.CancelEventWithLog(event, metadata.ErrTableAlreadyExists, "table already exists") + return + } +} + +func createTableAssign(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + params := req.p.params + + schemaName := params.SourceReq.GetSchemaName() + tableName := params.SourceReq.GetName() + + targetShardID, exists, err := params.ClusterMetadata.GetAssignTable(req.ctx, schemaName, tableName) + if err != nil { + procedure.CancelEventWithLog(event, err, "get table assign", zap.String("schemaName", schemaName), zap.String("tableName", tableName)) + return + } + if exists { + if targetShardID != params.ShardID { + procedure.CancelEventWithLog(event, procedure.ErrShardNotMatch, "target shard not match to persist data", zap.String("schemaName", schemaName), zap.String("tableName", tableName), zap.Uint32("targetShardID", uint32(targetShardID)), zap.Uint32("persistShardID", uint32(params.ShardID))) + return + } + return + } + + if err := params.ClusterMetadata.AssignTable(req.ctx, schemaName, tableName, params.ShardID); err != nil { + procedure.CancelEventWithLog(event, err, "persist table assign") + return + } + + log.Debug("create table assign finish", zap.String("schemaName", schemaName), zap.String("tableName", tableName)) +} + +func createMetadata(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + params := req.p.params + + _, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + procedure.CancelEventWithLog(event, err, "get table metadata") + return + } + if exists { + log.Info("table metadata already exists", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) + return + } + createTableMetadataRequest := metadata.CreateTableMetadataRequest{ SchemaName: params.SourceReq.GetSchemaName(), TableName: params.SourceReq.GetName(), @@ -139,6 +215,11 @@ func createFinish(event *fsm.Event) { procedure.CancelEventWithLog(event, err, "get request from event") return } + params := req.p.params + + if err := req.p.params.ClusterMetadata.DeleteAssignTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()); err != nil { + log.Warn("delete assign table failed", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) + } assert.Assert(req.createTableResult != nil) if err := req.p.params.OnSucceeded(*req.createTableResult); err != nil { @@ -229,7 +310,6 @@ func (p *Procedure) Kind() procedure.Kind { func (p *Procedure) Start(ctx context.Context) error { p.updateState(procedure.StateRunning) - // Try to load persist data. req := &callbackRequest{ ctx: ctx, p: p, @@ -239,6 +319,16 @@ func (p *Procedure) Start(ctx context.Context) error { for { switch p.fsm.Current() { case stateBegin: + if err := p.fsm.Event(eventCheckTableExists, req); err != nil { + _ = p.params.OnFailed(err) + return err + } + case stateCheckTableExists: + if err := p.fsm.Event(eventCreateTableAssign, req); err != nil { + _ = p.params.OnFailed(err) + return err + } + case stateCreateTableAssign: if err := p.fsm.Event(eventCreateMetadata, req); err != nil { _ = p.params.OnFailed(err) return err diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go index 165cb2cb..c59dc8fc 100644 --- a/server/coordinator/procedure/error.go +++ b/server/coordinator/procedure/error.go @@ -23,6 +23,7 @@ import "github.com/apache/incubator-horaedb-meta/pkg/coderr" var ( ErrShardLeaderNotFound = coderr.NewCodeError(coderr.Internal, "shard leader not found") + ErrShardNotMatch = coderr.NewCodeError(coderr.Internal, "target shard not match to persis data") ErrProcedureNotFound = coderr.NewCodeError(coderr.Internal, "procedure not found") ErrClusterConfigChanged = coderr.NewCodeError(coderr.Internal, "cluster config changed") ErrTableNotExists = coderr.NewCodeError(coderr.Internal, "table not exists") From 4c1cd585dcbe395c751138dea0933ea512631eb2 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Wed, 20 Dec 2023 17:21:38 +0800 Subject: [PATCH 05/13] fix ci --- server/cluster/metadata/topology_manager.go | 1 - server/etcdutil/util.go | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index 7719047c..cf6fccbc 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -667,7 +667,6 @@ func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas []sto if err != nil { return errors.WithMessage(err, "storage list assign table") } - //m.tableAssignMapping[schema.ID] = make(map[string]storage.ShardID, len(listAssignTableResult.TableAssigns)) for _, assignTable := range listAssignTableResult.TableAssigns { m.tableAssignMapping[schema.ID][assignTable.TableName] = assignTable.ShardID } diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go index 6ec1f062..2fcc53df 100644 --- a/server/etcdutil/util.go +++ b/server/etcdutil/util.go @@ -19,10 +19,11 @@ package etcdutil import ( "context" + "path" + "github.com/CeresDB/ceresmeta/pkg/log" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "path" ) func Get(ctx context.Context, client *clientv3.Client, key string) (string, error) { From a1156687991b648b11e6917b60af4cdb156fce55 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Wed, 10 Jan 2024 15:58:13 +0800 Subject: [PATCH 06/13] refactor by cr --- server/cluster/metadata/cluster_metadata.go | 8 ++++---- server/cluster/metadata/topology_manager.go | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index 38e44785..f6994679 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -282,7 +282,7 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName string) (storage.Table, // GetTableShard get the shard where the table actually exists. func (c *ClusterMetadata) GetTableShard(ctx context.Context, table storage.Table) (storage.ShardID, bool) { - return c.topologyManager.GetTableShard(ctx, table) + return c.topologyManager.GetTableShardID(ctx, table) } func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request CreateTableMetadataRequest) (CreateTableMetadataResult, error) { @@ -403,7 +403,7 @@ func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName string, if !exists { return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) } - shardIDs, exists := c.topologyManager.GetAssignTableResult(ctx, schema.ID, tableName) + shardIDs, exists := c.topologyManager.GetAssignTableShard(ctx, schema.ID, tableName) return shardIDs, exists, nil } @@ -412,7 +412,7 @@ func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string, ta if !exists { return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) } - return c.topologyManager.AssignTable(ctx, schema.ID, tableName, shardID) + return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName, shardID) } func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaName string, tableName string) error { @@ -420,7 +420,7 @@ func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaName stri if !exists { return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) } - return c.topologyManager.DeleteAssignTable(ctx, schema.ID, tableName) + return c.topologyManager.DeleteTableAssignedShard(ctx, schema.ID, tableName) } func (c *ClusterMetadata) GetShards() []storage.ShardID { diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index cf6fccbc..59f8e3cb 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -45,13 +45,13 @@ type TopologyManager interface { // RemoveTable remove table on target shards from cluster topology. RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error // GetTableShard get the shardID of the shard where the table is located. - GetTableShard(ctx context.Context, table storage.Table) (storage.ShardID, bool) + GetTableShardID(ctx context.Context, table storage.Table) (storage.ShardID, bool) // AssignTable persistent table shard mapping, it is used to store assign results and make the table creation idempotent. - AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error + AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error // GetAssignTableResult get table assign result. - GetAssignTableResult(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) + GetAssignTableShard(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) // DeleteAssignTable delete table assign result. - DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error + DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error // GetShards get all shards in cluster topology. GetShards() []storage.ShardID // GetShardNodesByID get shardNodes with shardID. @@ -310,7 +310,7 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S return nil } -func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table storage.Table) (storage.ShardID, bool) { +func (m *TopologyManagerImpl) GetTableShardID(_ context.Context, table storage.Table) (storage.ShardID, bool) { m.lock.RLock() defer m.lock.RUnlock() @@ -322,7 +322,7 @@ func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table storage.Tab return 0, false } -func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { +func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error { m.lock.Lock() defer m.lock.Unlock() @@ -345,12 +345,12 @@ func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID storage. return nil } -func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { +func (m *TopologyManagerImpl) GetAssignTableShard(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { assignResult, exists := m.tableAssignMapping[schemaID][tableName] return assignResult, exists } -func (m *TopologyManagerImpl) DeleteAssignTable(ctx context.Context, schemaID storage.SchemaID, tableName string) error { +func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error { m.lock.Lock() defer m.lock.Unlock() From 71792f28ab30f77cda7cf7cba7720a2575f6478c Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Wed, 10 Jan 2024 16:43:47 +0800 Subject: [PATCH 07/13] rebase on main --- server/coordinator/procedure/storage_impl.go | 29 +++++++++++--------- server/etcdutil/util.go | 14 ++++++---- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/server/coordinator/procedure/storage_impl.go b/server/coordinator/procedure/storage_impl.go index 95a0c2d2..3d60cd05 100644 --- a/server/coordinator/procedure/storage_impl.go +++ b/server/coordinator/procedure/storage_impl.go @@ -1,17 +1,20 @@ /* - * Copyright 2022 The CeresDB Authors + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package procedure @@ -24,8 +27,8 @@ import ( "path" "strconv" - "github.com/CeresDB/ceresmeta/pkg/log" - "github.com/CeresDB/ceresmeta/server/etcdutil" + "github.com/apache/incubator-horaedb-meta/pkg/log" + "github.com/apache/incubator-horaedb-meta/server/etcdutil" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/clientv3util" diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go index 2fcc53df..1b7b6e89 100644 --- a/server/etcdutil/util.go +++ b/server/etcdutil/util.go @@ -1,9 +1,11 @@ /* - * Copyright 2022 The CeresDB Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -21,7 +23,7 @@ import ( "context" "path" - "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/apache/incubator-horaedb-meta/pkg/log" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) From eee655cb003e022651a02b699409498efb36b4f0 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 22 Jan 2024 14:31:05 +0800 Subject: [PATCH 08/13] add unit test --- server/cluster/cluster.go | 2 +- server/coordinator/factory.go | 21 +++--- server/coordinator/factory_test.go | 2 +- server/coordinator/persist_shard_picker.go | 62 ++++++++++++++++ .../coordinator/persist_shard_picker_test.go | 67 +++++++++++++++++ .../procedure/ddl/createtable/create_table.go | 71 ++++--------------- .../manager/scheduler_manager_test.go | 2 +- .../scheduler/rebalanced/scheduler_test.go | 10 +-- .../scheduler/reopen/scheduler_test.go | 4 +- .../scheduler/static/scheduler_test.go | 10 +-- 10 files changed, 172 insertions(+), 79 deletions(-) create mode 100644 server/coordinator/persist_shard_picker.go create mode 100644 server/coordinator/persist_shard_picker_test.go diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6af9b2a9..95136177 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -58,7 +58,7 @@ func NewCluster(logger *zap.Logger, metadata *metadata.ClusterMetadata, client * dispatch := eventdispatch.NewDispatchImpl() procedureIDRootPath := strings.Join([]string{rootPath, metadata.Name(), defaultProcedurePrefixKey}, "/") - procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), dispatch, procedureStorage) + procedureFactory := coordinator.NewFactory(logger, id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), dispatch, procedureStorage, metadata) schedulerManager := manager.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize()) diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 561f7581..d7ea666f 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -39,11 +39,12 @@ import ( ) type Factory struct { - logger *zap.Logger - idAllocator id.Allocator - dispatch eventdispatch.Dispatch - storage procedure.Storage - shardPicker ShardPicker + logger *zap.Logger + idAllocator id.Allocator + dispatch eventdispatch.Dispatch + storage procedure.Storage + shardPicker *PersistShardPicker + clusterMetadata *metadata.ClusterMetadata } type CreateTableRequest struct { @@ -101,13 +102,13 @@ type BatchRequest struct { BatchType procedure.Kind } -func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory { +func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage, clusterMetadata *metadata.ClusterMetadata) *Factory { return &Factory{ idAllocator: allocator, dispatch: dispatch, storage: storage, logger: logger, - shardPicker: NewLeastTableShardPicker(), + shardPicker: NewPersistShardPicker(clusterMetadata, NewLeastTableShardPicker()), } } @@ -137,7 +138,7 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa if exists { targetShardID = shardID } else { - shards, err := f.shardPicker.PickShards(ctx, snapshot, 1) + shards, err := f.shardPicker.PickShards(ctx, snapshot, request.SourceReq.GetSchemaName(), []string{request.SourceReq.GetName()}) if err != nil { f.logger.Error("pick table shard", zap.Error(err)) return nil, errors.WithMessage(err, "pick table shard") @@ -146,7 +147,7 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa f.logger.Error("pick table shards length not equal 1", zap.Int("shards", len(shards))) return nil, errors.WithMessagef(procedure.ErrPickShard, "pick table shard, shards length:%d", len(shards)) } - targetShardID = shards[0].ID + targetShardID = shards[request.SourceReq.GetName()].ID } return createtable.NewProcedure(createtable.ProcedureParams{ @@ -174,7 +175,7 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request nodeNames[shardNode.NodeName] = 1 } - subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, len(request.SourceReq.PartitionTableInfo.SubTableNames)) + subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, request.SourceReq.GetSchemaName(), request.SourceReq.PartitionTableInfo.SubTableNames) if err != nil { return nil, errors.WithMessage(err, "pick sub table shards") } diff --git a/server/coordinator/factory_test.go b/server/coordinator/factory_test.go index 336255a1..04c83c67 100644 --- a/server/coordinator/factory_test.go +++ b/server/coordinator/factory_test.go @@ -39,7 +39,7 @@ func setupFactory(t *testing.T) (*coordinator.Factory, *metadata.ClusterMetadata dispatch := test.MockDispatch{} allocator := test.MockIDAllocator{} storage := test.NewTestStorage(t) - f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage) + f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage, c.GetMetadata()) return f, c.GetMetadata() } diff --git a/server/coordinator/persist_shard_picker.go b/server/coordinator/persist_shard_picker.go new file mode 100644 index 00000000..887f3011 --- /dev/null +++ b/server/coordinator/persist_shard_picker.go @@ -0,0 +1,62 @@ +package coordinator + +import ( + "context" + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" + "github.com/apache/incubator-horaedb-meta/server/storage" + "github.com/pkg/errors" +) + +type PersistShardPicker struct { + cluster *metadata.ClusterMetadata + internal ShardPicker +} + +func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal ShardPicker) *PersistShardPicker { + return &PersistShardPicker{cluster: cluster, internal: internal} +} + +func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.Snapshot, schemaName string, tableNames []string) (map[string]storage.ShardNode, error) { + result := map[string]storage.ShardNode{} + + shardNodeMap := map[storage.ShardID]storage.ShardNode{} + for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes { + shardNodeMap[shardNode.ID] = shardNode + } + + // If table assign has been created, just reuse it. + for i := 0; i < len(tableNames); i++ { + shardID, exists, err := p.cluster.GetAssignTable(ctx, schemaName, tableNames[i]) + if err != nil { + return map[string]storage.ShardNode{}, err + } + if exists { + result[tableNames[i]] = shardNodeMap[shardID] + } + } + + if len(result) == len(tableNames) { + return result, nil + } + + if len(result) != len(tableNames) && len(result) != 0 { + // TODO: Should all table assigns be cleared? + return result, errors.WithMessagef(ErrPickNode, "The number of table assign %d is inconsistent with the number of tables %d", len(result), len(tableNames)) + } + + // No table assign has been created, try to pick shard and save table assigns. + shardNodes, err := p.internal.PickShards(ctx, snapshot, len(tableNames)) + if err != nil { + return map[string]storage.ShardNode{}, err + } + + for i, shardNode := range shardNodes { + result[tableNames[i]] = shardNode + err = p.cluster.AssignTable(ctx, schemaName, tableNames[i], shardNode.ID) + if err != nil { + return map[string]storage.ShardNode{}, err + } + } + + return result, nil +} diff --git a/server/coordinator/persist_shard_picker_test.go b/server/coordinator/persist_shard_picker_test.go new file mode 100644 index 00000000..984ffd4f --- /dev/null +++ b/server/coordinator/persist_shard_picker_test.go @@ -0,0 +1,67 @@ +package coordinator_test + +import ( + "context" + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" + "github.com/apache/incubator-horaedb-meta/server/coordinator" + "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" + "github.com/apache/incubator-horaedb-meta/server/storage" + "github.com/stretchr/testify/require" + "testing" +) + +func TestPersistShardPicker(t *testing.T) { + re := require.New(t) + ctx := context.Background() + + c := test.InitStableCluster(ctx, t) + + persistShardPicker := coordinator.NewPersistShardPicker(c.GetMetadata(), coordinator.NewLeastTableShardPicker()) + pickResult, err := persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName0}) + re.NoError(err) + re.Equal(len(pickResult), 1) + + createResult, err := c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{ + ShardID: pickResult[test.TestTableName0].ID, + LatestVersion: 0, + SchemaName: test.TestSchemaName, + TableName: test.TestTableName0, + PartitionInfo: storage.PartitionInfo{Info: nil}, + }) + re.NoError(err) + re.Equal(test.TestTableName0, createResult.Table.Name) + + // Try to pick shard for same table after the table is created. + newPickResult, err := persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName0}) + re.NoError(err) + re.Equal(len(newPickResult), 1) + re.Equal(newPickResult[test.TestTableName0], pickResult[test.TestTableName0]) + + // Try to pick shard for another table. + pickResult, err = persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName1}) + re.NoError(err) + re.Equal(len(pickResult), 1) + + err = c.GetMetadata().DropTable(ctx, metadata.DropTableRequest{ + SchemaName: test.TestSchemaName, + TableName: test.TestTableName0, + ShardID: pickResult[test.TestTableName0].ID, + LatestVersion: 0, + }) + re.NoError(err) + + // Try to pick shard for table1 after drop table0. + newPickResult, err = persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName1}) + re.NoError(err) + re.Equal(len(pickResult), 1) + re.Equal(newPickResult[test.TestTableName1], pickResult[test.TestTableName1]) + + err = c.GetMetadata().DeleteAssignTable(ctx, test.TestSchemaName, test.TestTableName1) + re.NoError(err) + + // Try to pick another for table1 after drop table1 assign result. + newPickResult, err = persistShardPicker.PickShards(ctx, c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, []string{test.TestTableName1}) + re.NoError(err) + re.Equal(len(pickResult), 1) + re.NotEqual(newPickResult[test.TestTableName1], pickResult[test.TestTableName1]) +} diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index 63dc3ad5..dda8cf89 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -37,34 +37,30 @@ import ( ) const ( - eventCheckTableExists = "EventCheckTableExists" - eventCreateTableAssign = "EventCreateTableAssign" - eventCreateMetadata = "EventCreateMetadata" - eventCreateOnShard = "EventCreateOnShard" - eventFinish = "EventFinish" - - stateBegin = "StateBegin" - stateCheckTableExists = "StateCheckTableExists" - stateCreateTableAssign = "StateCreateTableAssign" - stateCreateMetadata = "StateCreateMetadata" - stateCreateOnShard = "StateCreateOnShard" - stateFinish = "StateFinish" + eventCheckTableExists = "EventCheckTableExists" + eventCreateMetadata = "EventCreateMetadata" + eventCreateOnShard = "EventCreateOnShard" + eventFinish = "EventFinish" + + stateBegin = "StateBegin" + stateCheckTableExists = "StateCheckTableExists" + stateCreateMetadata = "StateCreateMetadata" + stateCreateOnShard = "StateCreateOnShard" + stateFinish = "StateFinish" ) var ( createTableEvents = fsm.Events{ {Name: eventCheckTableExists, Src: []string{stateBegin}, Dst: stateCheckTableExists}, - {Name: eventCreateTableAssign, Src: []string{stateCheckTableExists}, Dst: stateCreateTableAssign}, - {Name: eventCreateMetadata, Src: []string{stateCreateTableAssign}, Dst: stateCreateMetadata}, + {Name: eventCreateMetadata, Src: []string{stateCheckTableExists}, Dst: stateCreateMetadata}, {Name: eventCreateOnShard, Src: []string{stateCreateMetadata}, Dst: stateCreateOnShard}, {Name: eventFinish, Src: []string{stateCreateOnShard}, Dst: stateFinish}, } createTableCallbacks = fsm.Callbacks{ - eventCheckTableExists: checkTableExists, - eventCreateTableAssign: createTableAssign, - eventCreateMetadata: createMetadata, - eventCreateOnShard: createOnShard, - eventFinish: createFinish, + eventCheckTableExists: checkTableExists, + eventCreateMetadata: createMetadata, + eventCreateOnShard: createOnShard, + eventFinish: createFinish, } ) @@ -94,38 +90,6 @@ func checkTableExists(event *fsm.Event) { } } -func createTableAssign(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - params := req.p.params - - schemaName := params.SourceReq.GetSchemaName() - tableName := params.SourceReq.GetName() - - targetShardID, exists, err := params.ClusterMetadata.GetAssignTable(req.ctx, schemaName, tableName) - if err != nil { - procedure.CancelEventWithLog(event, err, "get table assign", zap.String("schemaName", schemaName), zap.String("tableName", tableName)) - return - } - if exists { - if targetShardID != params.ShardID { - procedure.CancelEventWithLog(event, procedure.ErrShardNotMatch, "target shard not match to persist data", zap.String("schemaName", schemaName), zap.String("tableName", tableName), zap.Uint32("targetShardID", uint32(targetShardID)), zap.Uint32("persistShardID", uint32(params.ShardID))) - return - } - return - } - - if err := params.ClusterMetadata.AssignTable(req.ctx, schemaName, tableName, params.ShardID); err != nil { - procedure.CancelEventWithLog(event, err, "persist table assign") - return - } - - log.Debug("create table assign finish", zap.String("schemaName", schemaName), zap.String("tableName", tableName)) -} - func createMetadata(event *fsm.Event) { req, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { @@ -324,11 +288,6 @@ func (p *Procedure) Start(ctx context.Context) error { return err } case stateCheckTableExists: - if err := p.fsm.Event(eventCreateTableAssign, req); err != nil { - _ = p.params.OnFailed(err) - return err - } - case stateCreateTableAssign: if err := p.fsm.Event(eventCreateMetadata, req); err != nil { _ = p.params.OnFailed(err) return err diff --git a/server/coordinator/scheduler/manager/scheduler_manager_test.go b/server/coordinator/scheduler/manager/scheduler_manager_test.go index 15d2bd6c..7edbf5ec 100644 --- a/server/coordinator/scheduler/manager/scheduler_manager_test.go +++ b/server/coordinator/scheduler/manager/scheduler_manager_test.go @@ -44,7 +44,7 @@ func TestSchedulerManager(t *testing.T) { dispatch := test.MockDispatch{} allocator := test.MockIDAllocator{} s := test.NewTestStorage(t) - f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s) + f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s, c.GetMetadata()) _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) // Create scheduler manager with enableScheduler equal to false. diff --git a/server/coordinator/scheduler/rebalanced/scheduler_test.go b/server/coordinator/scheduler/rebalanced/scheduler_test.go index e451b532..8c931572 100644 --- a/server/coordinator/scheduler/rebalanced/scheduler_test.go +++ b/server/coordinator/scheduler/rebalanced/scheduler_test.go @@ -35,23 +35,25 @@ func TestRebalancedScheduler(t *testing.T) { re := require.New(t) ctx := context.Background() - procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - - s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) - // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) + procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), emptyCluster.GetMetadata()) + s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.Empty(result) // PrepareCluster would be scheduled an empty procedure. prepareCluster := test.InitPrepareCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), prepareCluster.GetMetadata()) + s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) _, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) // StableCluster with all shards assigned would be scheduled a load balance procedure. stableCluster := test.InitStableCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), stableCluster.GetMetadata()) + s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) _, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) } diff --git a/server/coordinator/scheduler/reopen/scheduler_test.go b/server/coordinator/scheduler/reopen/scheduler_test.go index 8060e2df..2b23051a 100644 --- a/server/coordinator/scheduler/reopen/scheduler_test.go +++ b/server/coordinator/scheduler/reopen/scheduler_test.go @@ -35,12 +35,12 @@ import ( func TestReopenShardScheduler(t *testing.T) { re := require.New(t) ctx := context.Background() + emptyCluster := test.InitEmptyCluster(ctx, t) - procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) + procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), emptyCluster.GetMetadata()) s := reopen.NewShardScheduler(procedureFactory, 1) - emptyCluster := test.InitEmptyCluster(ctx, t) // ReopenShardScheduler should not schedule when cluster is not stable. result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) diff --git a/server/coordinator/scheduler/static/scheduler_test.go b/server/coordinator/scheduler/static/scheduler_test.go index 929a2f07..d1c9b83e 100644 --- a/server/coordinator/scheduler/static/scheduler_test.go +++ b/server/coordinator/scheduler/static/scheduler_test.go @@ -35,24 +35,26 @@ func TestStaticTopologyScheduler(t *testing.T) { re := require.New(t) ctx := context.Background() - procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t)) - - s := static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) - // EmptyCluster would be scheduled an empty procedure. emptyCluster := test.InitEmptyCluster(ctx, t) + procedureFactory := coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), emptyCluster.GetMetadata()) + s := static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.Empty(result) // PrepareCluster would be scheduled a transfer leader procedure. prepareCluster := test.InitPrepareCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), prepareCluster.GetMetadata()) + s = static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.NotEmpty(result) // StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule. stableCluster := test.InitStableCluster(ctx, t) + procedureFactory = coordinator.NewFactory(zap.NewNop(), test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), stableCluster.GetMetadata()) + s = static.NewShardScheduler(procedureFactory, nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1) result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot()) re.NoError(err) re.NotEmpty(result) From 1815561daea6add2b42d07acd141b0f8acf50816 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 22 Jan 2024 15:02:51 +0800 Subject: [PATCH 09/13] fix ci --- server/coordinator/factory.go | 11 +++++------ server/coordinator/persist_shard_picker.go | 1 + server/coordinator/persist_shard_picker_test.go | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index d7ea666f..7ae47e9f 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -39,12 +39,11 @@ import ( ) type Factory struct { - logger *zap.Logger - idAllocator id.Allocator - dispatch eventdispatch.Dispatch - storage procedure.Storage - shardPicker *PersistShardPicker - clusterMetadata *metadata.ClusterMetadata + logger *zap.Logger + idAllocator id.Allocator + dispatch eventdispatch.Dispatch + storage procedure.Storage + shardPicker *PersistShardPicker } type CreateTableRequest struct { diff --git a/server/coordinator/persist_shard_picker.go b/server/coordinator/persist_shard_picker.go index 887f3011..5073d266 100644 --- a/server/coordinator/persist_shard_picker.go +++ b/server/coordinator/persist_shard_picker.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" "github.com/apache/incubator-horaedb-meta/server/storage" "github.com/pkg/errors" diff --git a/server/coordinator/persist_shard_picker_test.go b/server/coordinator/persist_shard_picker_test.go index 984ffd4f..481ea1cd 100644 --- a/server/coordinator/persist_shard_picker_test.go +++ b/server/coordinator/persist_shard_picker_test.go @@ -2,12 +2,13 @@ package coordinator_test import ( "context" + "testing" + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" "github.com/apache/incubator-horaedb-meta/server/coordinator" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" "github.com/apache/incubator-horaedb-meta/server/storage" "github.com/stretchr/testify/require" - "testing" ) func TestPersistShardPicker(t *testing.T) { From 1c9cbc2af3f3adf4bb7abcbe9b448411af3c75f0 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 22 Jan 2024 16:32:03 +0800 Subject: [PATCH 10/13] add license header --- server/coordinator/persist_shard_picker.go | 19 +++++++++++++++++++ .../coordinator/persist_shard_picker_test.go | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/server/coordinator/persist_shard_picker.go b/server/coordinator/persist_shard_picker.go index 5073d266..92ce3c5c 100644 --- a/server/coordinator/persist_shard_picker.go +++ b/server/coordinator/persist_shard_picker.go @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package coordinator import ( diff --git a/server/coordinator/persist_shard_picker_test.go b/server/coordinator/persist_shard_picker_test.go index 481ea1cd..2b44a131 100644 --- a/server/coordinator/persist_shard_picker_test.go +++ b/server/coordinator/persist_shard_picker_test.go @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package coordinator_test import ( From 04deabb6c71679a326e49677a324ae2146a06f77 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 22 Jan 2024 16:38:45 +0800 Subject: [PATCH 11/13] fix error msg --- server/coordinator/procedure/ddl/createtable/create_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index dda8cf89..39b7ba40 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -85,7 +85,7 @@ func checkTableExists(event *fsm.Event) { // Check whether the table shard mapping already exists. _, exists = params.ClusterMetadata.GetTableShard(req.ctx, table) if exists { - procedure.CancelEventWithLog(event, metadata.ErrTableAlreadyExists, "table already exists") + procedure.CancelEventWithLog(event, metadata.ErrTableAlreadyExists, "table shard already exists") return } } From 6a2387d1c8b38608c1e1df935a9764f36e799cba Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Tue, 23 Jan 2024 11:21:41 +0800 Subject: [PATCH 12/13] refactor by cr --- server/cluster/metadata/cluster_metadata.go | 8 ++-- server/cluster/metadata/topology_manager.go | 18 ++++----- server/coordinator/factory.go | 2 +- server/coordinator/persist_shard_picker.go | 20 ++++++---- .../coordinator/persist_shard_picker_test.go | 2 +- .../procedure/ddl/createtable/create_table.go | 2 +- server/etcdutil/util.go | 33 +++++++--------- server/etcdutil/util_test.go | 39 +++++++++++++++++++ server/storage/meta.go | 12 +++--- server/storage/storage_impl.go | 12 +++--- server/storage/types.go | 6 +-- 11 files changed, 97 insertions(+), 57 deletions(-) diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index f6994679..0ae03b6a 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -398,16 +398,16 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe return ret, nil } -func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool, error) { +func (c *ClusterMetadata) GetTableAssignedShard(ctx context.Context, schemaName string, tableName string) (storage.ShardID, bool, error) { schema, exists := c.tableManager.GetSchema(schemaName) if !exists { return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) } - shardIDs, exists := c.topologyManager.GetAssignTableShard(ctx, schema.ID, tableName) + shardIDs, exists := c.topologyManager.GetTableAssignedShard(ctx, schema.ID, tableName) return shardIDs, exists, nil } -func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string, tableName string, shardID storage.ShardID) error { +func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName string, tableName string, shardID storage.ShardID) error { schema, exists := c.tableManager.GetSchema(schemaName) if !exists { return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) @@ -415,7 +415,7 @@ func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string, ta return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName, shardID) } -func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaName string, tableName string) error { +func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context, schemaName string, tableName string) error { schema, exists := c.tableManager.GetSchema(schemaName) if !exists { return errors.WithMessagef(ErrSchemaNotFound, "schema %s not found", schemaName) diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index 59f8e3cb..f744a5d9 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -44,13 +44,13 @@ type TopologyManager interface { AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error // RemoveTable remove table on target shards from cluster topology. RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error - // GetTableShard get the shardID of the shard where the table is located. + // GetTableShardID get the shardID of the shard where the table is located. GetTableShardID(ctx context.Context, table storage.Table) (storage.ShardID, bool) - // AssignTable persistent table shard mapping, it is used to store assign results and make the table creation idempotent. + // AssignTableToShard persistent table shard mapping, it is used to store assign results and make the table creation idempotent. AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error - // GetAssignTableResult get table assign result. - GetAssignTableShard(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) - // DeleteAssignTable delete table assign result. + // GetTableAssignedShard get table assign result. + GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) + // DeleteTableAssignedShard delete table assign result. DeleteTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, tableName string) error // GetShards get all shards in cluster topology. GetShards() []storage.ShardID @@ -326,7 +326,7 @@ func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID s m.lock.Lock() defer m.lock.Unlock() - if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{ + if err := m.storage.AssignTableToShard(ctx, storage.AssignTableToShardRequest{ ClusterID: m.clusterID, SchemaID: schemaID, TableName: tableName, @@ -345,7 +345,7 @@ func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID s return nil } -func (m *TopologyManagerImpl) GetAssignTableShard(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { +func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context, schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) { assignResult, exists := m.tableAssignMapping[schemaID][tableName] return assignResult, exists } @@ -354,7 +354,7 @@ func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context, sche m.lock.Lock() defer m.lock.Unlock() - if err := m.storage.DeleteAssignTable(ctx, storage.DeleteAssignTableRequest{ + if err := m.storage.DeleteTableAssignedShard(ctx, storage.DeleteTableAssignedRequest{ ClusterID: m.clusterID, SchemaID: schemaID, TableName: tableName, @@ -663,7 +663,7 @@ func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas []sto for _, schema := range schemas { m.tableAssignMapping[schema.ID] = make(map[string]storage.ShardID, 0) - listAssignTableResult, err := m.storage.ListAssignTable(ctx, storage.ListAssignTableRequest{ClusterID: m.clusterID, SchemaID: schema.ID}) + listAssignTableResult, err := m.storage.ListTableAssignedShard(ctx, storage.ListAssignTableRequest{ClusterID: m.clusterID, SchemaID: schema.ID}) if err != nil { return errors.WithMessage(err, "storage list assign table") } diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 7ae47e9f..9e12c7f1 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -130,7 +130,7 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa snapshot := request.ClusterMetadata.GetClusterSnapshot() var targetShardID storage.ShardID - shardID, exists, err := request.ClusterMetadata.GetAssignTable(ctx, request.SourceReq.SchemaName, request.SourceReq.Name) + shardID, exists, err := request.ClusterMetadata.GetTableAssignedShard(ctx, request.SourceReq.SchemaName, request.SourceReq.Name) if err != nil { return nil, err } diff --git a/server/coordinator/persist_shard_picker.go b/server/coordinator/persist_shard_picker.go index 92ce3c5c..c2884bdc 100644 --- a/server/coordinator/persist_shard_picker.go +++ b/server/coordinator/persist_shard_picker.go @@ -24,7 +24,6 @@ import ( "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/pkg/errors" ) type PersistShardPicker struct { @@ -44,14 +43,17 @@ func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.S shardNodeMap[shardNode.ID] = shardNode } + var missingTables []string // If table assign has been created, just reuse it. for i := 0; i < len(tableNames); i++ { - shardID, exists, err := p.cluster.GetAssignTable(ctx, schemaName, tableNames[i]) + shardID, exists, err := p.cluster.GetTableAssignedShard(ctx, schemaName, tableNames[i]) if err != nil { return map[string]storage.ShardNode{}, err } if exists { result[tableNames[i]] = shardNodeMap[shardID] + } else { + missingTables = append(missingTables, tableNames[i]) } } @@ -59,20 +61,22 @@ func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.S return result, nil } - if len(result) != len(tableNames) && len(result) != 0 { - // TODO: Should all table assigns be cleared? - return result, errors.WithMessagef(ErrPickNode, "The number of table assign %d is inconsistent with the number of tables %d", len(result), len(tableNames)) + var tablesNeedToAssignShard []string + if len(missingTables) > 0 { + tablesNeedToAssignShard = missingTables + } else { + tablesNeedToAssignShard = tableNames } // No table assign has been created, try to pick shard and save table assigns. - shardNodes, err := p.internal.PickShards(ctx, snapshot, len(tableNames)) + shardNodes, err := p.internal.PickShards(ctx, snapshot, len(tablesNeedToAssignShard)) if err != nil { return map[string]storage.ShardNode{}, err } for i, shardNode := range shardNodes { - result[tableNames[i]] = shardNode - err = p.cluster.AssignTable(ctx, schemaName, tableNames[i], shardNode.ID) + result[tablesNeedToAssignShard[i]] = shardNode + err = p.cluster.AssignTableToShard(ctx, schemaName, tablesNeedToAssignShard[i], shardNode.ID) if err != nil { return map[string]storage.ShardNode{}, err } diff --git a/server/coordinator/persist_shard_picker_test.go b/server/coordinator/persist_shard_picker_test.go index 2b44a131..67e3aa7e 100644 --- a/server/coordinator/persist_shard_picker_test.go +++ b/server/coordinator/persist_shard_picker_test.go @@ -76,7 +76,7 @@ func TestPersistShardPicker(t *testing.T) { re.Equal(len(pickResult), 1) re.Equal(newPickResult[test.TestTableName1], pickResult[test.TestTableName1]) - err = c.GetMetadata().DeleteAssignTable(ctx, test.TestSchemaName, test.TestTableName1) + err = c.GetMetadata().DeleteTableAssignedShard(ctx, test.TestSchemaName, test.TestTableName1) re.NoError(err) // Try to pick another for table1 after drop table1 assign result. diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index 39b7ba40..292f0900 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -181,7 +181,7 @@ func createFinish(event *fsm.Event) { } params := req.p.params - if err := req.p.params.ClusterMetadata.DeleteAssignTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()); err != nil { + if err := req.p.params.ClusterMetadata.DeleteTableAssignedShard(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()); err != nil { log.Warn("delete assign table failed", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) } diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go index 1b7b6e89..62ef0773 100644 --- a/server/etcdutil/util.go +++ b/server/etcdutil/util.go @@ -128,31 +128,28 @@ func Scan(ctx context.Context, client *clientv3.Client, startKey, endKey string, } } -func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix string, batchSize int, do func(key string, val []byte) error) error { +func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix string, do func(key string, val []byte) error) error { rangeEnd := clientv3.GetPrefixRangeEnd(prefix) + resp, err := client.Get(ctx, prefix, clientv3.WithRange(rangeEnd)) + if err != nil { + return ErrEtcdKVGet.WithCause(err) + } + // Check whether the keys are exhausted. + if len(resp.Kvs) == 0 { + return nil + } - for { - resp, err := client.Get(ctx, prefix, clientv3.WithRange(rangeEnd), clientv3.WithLimit(int64(batchSize))) + for _, item := range resp.Kvs { + err := do(string(item.Key), item.Value) if err != nil { - return ErrEtcdKVGet.WithCause(err) - } - // Check whether the keys are exhausted. - if len(resp.Kvs) == 0 { - return nil - } - - for _, item := range resp.Kvs { - err := do(string(item.Key), item.Value) - if err != nil { - return err - } + return err } - - rangeEnd = string(resp.Kvs[len(resp.Kvs)-1].Key) } + + return nil } -// GetLastPathSegment get +// GetLastPathSegment get the last path segment from completePath, path is split by '/'. func GetLastPathSegment(completePath string) string { return path.Base(path.Clean(completePath)) } diff --git a/server/etcdutil/util_test.go b/server/etcdutil/util_test.go index fa2532d6..6b816a4a 100644 --- a/server/etcdutil/util_test.go +++ b/server/etcdutil/util_test.go @@ -105,3 +105,42 @@ func TestScanFailed(t *testing.T) { err := Scan(ctx, client, startKey, endKey, 10, do) r.Equal(fakeErr, err) } + +func TestScanWithPrefix(t *testing.T) { + r := require.New(t) + + _, client, closeSrv := PrepareEtcdServerAndClient(t) + defer closeSrv() + ctx := context.Background() + + // Build keys with different prefix. + keys := []string{} + keys = append(keys, "/prefix/0") + keys = append(keys, "/prefix/1") + keys = append(keys, "/diff/0") + + // Put the keys. + for _, key := range keys { + // Let the value equal key for simplicity. + val := key + _, err := client.Put(ctx, key, val) + r.NoError(err) + } + + var scanResult []string + do := func(key string, value []byte) error { + scanResult = append(scanResult, key) + return nil + } + err := ScanWithPrefix(ctx, client, "/prefix", do) + r.NoError(err) + r.Equal(len(scanResult), 2) +} + +func TestGetLastPathSegment(t *testing.T) { + r := require.New(t) + + path := "/prefix/a/b/c" + lastPathSegment := GetLastPathSegment(path) + r.Equal("c", lastPathSegment) +} diff --git a/server/storage/meta.go b/server/storage/meta.go index 563f601e..b56da682 100644 --- a/server/storage/meta.go +++ b/server/storage/meta.go @@ -57,12 +57,12 @@ type Storage interface { // DeleteTable delete table by table name in specified cluster and schema. DeleteTable(ctx context.Context, req DeleteTableRequest) error - // AssignTable save table assign result. - AssignTable(ctx context.Context, req AssignTableRequest) error - // DeleteAssignTable delete table assign result. - DeleteAssignTable(ctx context.Context, req DeleteAssignTableRequest) error - // ListAssignTable list table assign result. - ListAssignTable(ctx context.Context, req ListAssignTableRequest) (ListAssignTableResult, error) + // AssignTableToShard save table assign result. + AssignTableToShard(ctx context.Context, req AssignTableToShardRequest) error + // DeleteTableAssignedShard delete table assign result. + DeleteTableAssignedShard(ctx context.Context, req DeleteTableAssignedRequest) error + // ListTableAssignedShard list table assign result. + ListTableAssignedShard(ctx context.Context, req ListAssignTableRequest) (ListTableAssignedShardResult, error) // CreateShardViews create shard views in specified cluster. CreateShardViews(ctx context.Context, req CreateShardViewsRequest) error diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go index 60fb8080..e6b130e8 100644 --- a/server/storage/storage_impl.go +++ b/server/storage/storage_impl.go @@ -418,7 +418,7 @@ func (s *metaStorageImpl) DeleteTable(ctx context.Context, req DeleteTableReques return nil } -func (s *metaStorageImpl) AssignTable(ctx context.Context, req AssignTableRequest) error { +func (s *metaStorageImpl) AssignTableToShard(ctx context.Context, req AssignTableToShardRequest) error { key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID), req.TableName) // Check if the key exists, if not,save table assign result; Otherwise, the table assign result already exists and return an error. @@ -439,7 +439,7 @@ func (s *metaStorageImpl) AssignTable(ctx context.Context, req AssignTableReques return nil } -func (s *metaStorageImpl) DeleteAssignTable(ctx context.Context, req DeleteAssignTableRequest) error { +func (s *metaStorageImpl) DeleteTableAssignedShard(ctx context.Context, req DeleteTableAssignedRequest) error { key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID), req.TableName) keyExists := clientv3util.KeyExists(key) @@ -459,7 +459,7 @@ func (s *metaStorageImpl) DeleteAssignTable(ctx context.Context, req DeleteAssig return nil } -func (s *metaStorageImpl) ListAssignTable(ctx context.Context, req ListAssignTableRequest) (ListAssignTableResult, error) { +func (s *metaStorageImpl) ListTableAssignedShard(ctx context.Context, req ListAssignTableRequest) (ListTableAssignedShardResult, error) { key := makeTableAssignPrefixKey(s.rootPath, uint32(req.ClusterID), uint32(req.SchemaID)) rangeLimit := s.opts.MaxScanLimit @@ -478,11 +478,11 @@ func (s *metaStorageImpl) ListAssignTable(ctx context.Context, req ListAssignTab return nil } - if err := etcdutil.ScanWithPrefix(ctx, s.client, key, rangeLimit, do); err != nil { - return ListAssignTableResult{}, errors.WithMessagef(err, "scan tables, clusterID:%d, schemaID:%d, prefix key:%s, range limit:%d", req.ClusterID, req.SchemaID, key, rangeLimit) + if err := etcdutil.ScanWithPrefix(ctx, s.client, key, do); err != nil { + return ListTableAssignedShardResult{}, errors.WithMessagef(err, "scan tables, clusterID:%d, schemaID:%d, prefix key:%s, range limit:%d", req.ClusterID, req.SchemaID, key, rangeLimit) } - return ListAssignTableResult{TableAssigns: tableAssigns}, nil + return ListTableAssignedShardResult{TableAssigns: tableAssigns}, nil } func (s *metaStorageImpl) createNShardViews(ctx context.Context, clusterID ClusterID, shardViews []ShardView, ifConds []clientv3.Cmp, opCreates []clientv3.Op) error { diff --git a/server/storage/types.go b/server/storage/types.go index 62295499..5711283a 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -141,14 +141,14 @@ type DeleteTableRequest struct { TableName string } -type AssignTableRequest struct { +type AssignTableToShardRequest struct { ClusterID ClusterID SchemaID SchemaID TableName string ShardID ShardID } -type DeleteAssignTableRequest struct { +type DeleteTableAssignedRequest struct { ClusterID ClusterID SchemaID SchemaID TableName string @@ -159,7 +159,7 @@ type ListAssignTableRequest struct { SchemaID SchemaID } -type ListAssignTableResult struct { +type ListTableAssignedShardResult struct { TableAssigns []TableAssign } From 9d5bcc09bde0a94e322705b733d6e7792aaeef90 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Tue, 23 Jan 2024 15:16:42 +0800 Subject: [PATCH 13/13] refactor by cr --- server/coordinator/persist_shard_picker.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/server/coordinator/persist_shard_picker.go b/server/coordinator/persist_shard_picker.go index c2884bdc..b3684221 100644 --- a/server/coordinator/persist_shard_picker.go +++ b/server/coordinator/persist_shard_picker.go @@ -38,7 +38,7 @@ func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal ShardPick func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.Snapshot, schemaName string, tableNames []string) (map[string]storage.ShardNode, error) { result := map[string]storage.ShardNode{} - shardNodeMap := map[storage.ShardID]storage.ShardNode{} + shardNodeMap := make(map[storage.ShardID]storage.ShardNode, len(tableNames)) for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes { shardNodeMap[shardNode.ID] = shardNode } @@ -57,26 +57,20 @@ func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot metadata.S } } - if len(result) == len(tableNames) { + // All table has been assigned to shard. + if len(missingTables) == 0 { return result, nil } - var tablesNeedToAssignShard []string - if len(missingTables) > 0 { - tablesNeedToAssignShard = missingTables - } else { - tablesNeedToAssignShard = tableNames - } - // No table assign has been created, try to pick shard and save table assigns. - shardNodes, err := p.internal.PickShards(ctx, snapshot, len(tablesNeedToAssignShard)) + shardNodes, err := p.internal.PickShards(ctx, snapshot, len(missingTables)) if err != nil { return map[string]storage.ShardNode{}, err } for i, shardNode := range shardNodes { - result[tablesNeedToAssignShard[i]] = shardNode - err = p.cluster.AssignTableToShard(ctx, schemaName, tablesNeedToAssignShard[i], shardNode.ID) + result[missingTables[i]] = shardNode + err = p.cluster.AssignTableToShard(ctx, schemaName, missingTables[i], shardNode.ID) if err != nil { return map[string]storage.ShardNode{}, err }