diff --git a/contrib/client-c b/contrib/client-c index 9bdd8241f6a..04d408143e5 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 9bdd8241f6ae05a46121f2f353c709c89615fd02 +Subproject commit 04d408143e5ceb01799ec642560c39dc1a0a373f diff --git a/contrib/kvproto b/contrib/kvproto index bfdb1d7eb15..145f0534a0e 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit bfdb1d7eb15712cff7b6d833f757662da3a4f7f4 +Subproject commit 145f0534a0eb7e953c78563c4b50e3877c120409 diff --git a/dbms/src/Common/RedactHelpers.cpp b/dbms/src/Common/RedactHelpers.cpp index 39af6aa08f3..0d9a0d9a928 100644 --- a/dbms/src/Common/RedactHelpers.cpp +++ b/dbms/src/Common/RedactHelpers.cpp @@ -28,7 +28,7 @@ void Redact::setRedactLog(bool v) Redact::REDACT_LOG.store(v, std::memory_order_relaxed); } -std::string Redact::handleToDebugString(DB::HandleID handle) +std::string Redact::handleToDebugString(int64_t handle) { if (Redact::REDACT_LOG.load(std::memory_order_relaxed)) return "?"; diff --git a/dbms/src/Common/RedactHelpers.h b/dbms/src/Common/RedactHelpers.h index b1a681c80dc..d58f64b5f46 100644 --- a/dbms/src/Common/RedactHelpers.h +++ b/dbms/src/Common/RedactHelpers.h @@ -14,8 +14,6 @@ #pragma once -#include - #include #include @@ -29,7 +27,7 @@ class Redact public: static void setRedactLog(bool v); - static std::string handleToDebugString(DB::HandleID handle); + static std::string handleToDebugString(int64_t handle); static std::string keyToDebugString(const char * key, size_t size); static std::string keyToHexString(const char * key, size_t size); diff --git a/dbms/src/Common/tests/gtest_redact.cpp b/dbms/src/Common/tests/gtest_redact.cpp index 68407b1c2b4..70fe330f4c8 100644 --- a/dbms/src/Common/tests/gtest_redact.cpp +++ b/dbms/src/Common/tests/gtest_redact.cpp @@ -19,12 +19,12 @@ namespace DB { namespace tests { -TEST(RedactLog_test, Basic) +TEST(RedactLogTest, Basic) { const char * test_key = "\x01\x0a\xff"; const size_t key_sz = strlen(test_key); - const DB::HandleID test_handle = 10009; + const /*DB::HandleID*/ Int64 test_handle = 10009; Redact::setRedactLog(false); EXPECT_EQ(Redact::keyToDebugString(test_key, key_sz), "010AFF"); diff --git a/dbms/src/Debug/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockRaftStoreProxy.cpp index 024ae1e506e..5263c361521 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockRaftStoreProxy.cpp @@ -660,7 +660,7 @@ TableID MockRaftStoreProxy::bootstrap_table( UInt64 table_id = MockTiDB::instance().newTable("d", "t" + toString(random()), columns, tso, "", "dt"); auto schema_syncer = tmt.getSchemaSyncer(); - schema_syncer->syncSchemas(ctx); + schema_syncer->syncSchemas(ctx, NullspaceID); this->table_id = table_id; return table_id; } diff --git a/dbms/src/Debug/MockSchemaGetter.h b/dbms/src/Debug/MockSchemaGetter.h index 677455e895a..63219e91107 100644 --- a/dbms/src/Debug/MockSchemaGetter.h +++ b/dbms/src/Debug/MockSchemaGetter.h @@ -71,6 +71,8 @@ struct MockSchemaGetter } return res; } + + KeyspaceID getKeyspaceID() const { return NullspaceID; } }; } // namespace DB diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 61c359ec298..85cc349fa67 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -82,9 +82,9 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_ tables_by_id.erase(partition.id); if (drop_regions) { - for (auto & e : region_table.getRegionsByTable(partition.id)) + for (auto & e : region_table.getRegionsByTable(NullspaceID, partition.id)) kvstore->mockRemoveRegion(e.first, region_table); - region_table.removeTable(partition.id); + region_table.removeTable(NullspaceID, partition.id); } } } @@ -94,9 +94,9 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_ if (drop_regions) { - for (auto & e : region_table.getRegionsByTable(table->id())) + for (auto & e : region_table.getRegionsByTable(NullspaceID, table->id())) kvstore->mockRemoveRegion(e.first, region_table); - region_table.removeTable(table->id()); + region_table.removeTable(NullspaceID, table->id()); } return table; diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index e9bd3816342..c613238316a 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -514,6 +514,7 @@ static GlobalRegionMap GLOBAL_REGION_MAP; /// Pre-decode region data into block cache and remove committed data from `region` RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & region, Context & context) { + auto keyspace_id = region->getKeyspaceID(); const auto & tmt = context.getTMTContext(); { Timestamp gc_safe_point = 0; @@ -556,7 +557,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio const auto atomic_decode = [&](bool force_decode) -> bool { Stopwatch watch; - auto storage = tmt.getStorages().get(table_id); + auto storage = tmt.getStorages().get(keyspace_id, table_id); if (storage == nullptr || storage->isTombstone()) { if (!force_decode) // Need to update. @@ -594,7 +595,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio if (!atomic_decode(false)) { - tmt.getSchemaSyncer()->syncSchemas(context); + tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id); if (!atomic_decode(true)) throw Exception("Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed", diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index 9ef07f16e8b..b6adeee48bd 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -66,7 +66,7 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer auto schema_syncer = tmt.getSchemaSyncer(); try { - schema_syncer->syncSchemas(context); + schema_syncer->syncSchemas(context, NullspaceID); } catch (Exception & e) { @@ -95,7 +95,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); else gc_safe_point = safeGet(typeid_cast(*args[0]).value); - service->gc(gc_safe_point); + service->gc(gc_safe_point, NullspaceID); output("schemas gc done"); } diff --git a/dbms/src/Debug/dbgFuncSchemaName.cpp b/dbms/src/Debug/dbgFuncSchemaName.cpp index a082b5438f3..6ed4c4213f2 100644 --- a/dbms/src/Debug/dbgFuncSchemaName.cpp +++ b/dbms/src/Debug/dbgFuncSchemaName.cpp @@ -173,7 +173,7 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs for (const auto & part_def : table_info.partition.definitions) { auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper); - auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id); + auto partition_storage = context.getTMTContext().getStorages().get(NullspaceID, paritition_table_info->id); fmt_buf.append((std::to_string(partition_storage->getTableInfo().replica_info.count))); fmt_buf.append("/"); } diff --git a/dbms/src/Debug/dbgNaturalDag.cpp b/dbms/src/Debug/dbgNaturalDag.cpp index 46adf58bba9..8af9acb6446 100644 --- a/dbms/src/Debug/dbgNaturalDag.cpp +++ b/dbms/src/Debug/dbgNaturalDag.cpp @@ -146,7 +146,7 @@ void NaturalDag::loadTables(const NaturalDag::JSONObjectPtr & obj) table.id = id; auto tbl_json = td_json->getObject(std::to_string(id)); auto meta_json = tbl_json->getObject(TABLE_META); - table.meta = TiDB::TableInfo(meta_json); + table.meta = TiDB::TableInfo(meta_json, NullspaceID); auto regions_json = tbl_json->getArray(TABLE_REGIONS); for (const auto & region_json : *regions_json) { @@ -208,7 +208,7 @@ void NaturalDag::buildTables(Context & context) auto & table = it.second; auto meta = table.meta; MockTiDB::instance().addTable(db_name, std::move(meta)); - schema_syncer->syncSchemas(context); + schema_syncer->syncSchemas(context, NullspaceID); for (auto & region : table.regions) { metapb::Region region_pb; @@ -243,7 +243,7 @@ void NaturalDag::buildDatabase(Context & context, SchemaSyncerPtr & schema_synce MockTiDB::instance().dropDB(context, db_name, true); } MockTiDB::instance().newDataBase(db_name); - schema_syncer->syncSchemas(context); + schema_syncer->syncSchemas(context, NullspaceID); } void NaturalDag::build(Context & context) diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 2902c950f1c..918e79effb3 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -181,7 +181,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro for (const auto & partition : table_info->partition.definitions) { const auto partition_id = partition.id; - auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(partition_id); + auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, partition_id); for (size_t i = 0; i < regions.size(); ++i) { if ((current_region_size + i) % properties.mpp_partition_num != static_cast(task.partition_id)) @@ -202,7 +202,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro } else { - auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id); + auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, table_id); if (regions.size() < static_cast(properties.mpp_partition_num)) throw Exception("Not supported: table region num less than mpp partition num"); for (size_t i = 0; i < regions.size(); ++i) @@ -229,7 +229,7 @@ BlockInputStreamPtr executeNonMPPQuery(Context & context, RegionID region_id, co RegionPtr region; if (region_id == InvalidRegionID) { - auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id); + auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, table_id); if (regions.empty()) throw Exception("No region for table", ErrorCodes::BAD_ARGUMENTS); region = regions[0].second; @@ -307,7 +307,7 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr)); - DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log); + DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", false, log); context.setDAGContext(&dag_context); DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true); @@ -335,7 +335,7 @@ bool runAndCompareDagReq(const coprocessor::Request & req, const coprocessor::Re auto & table_regions_info = tables_regions_info.getSingleTableRegions(); table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr)); - DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log); + DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", false, log); context.setDAGContext(&dag_context); DAGDriver driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true); driver.execute(); diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 854d8a18bd5..04336b71a24 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -549,7 +549,7 @@ Int64 concurrentRangeOperate( { TMTContext & tmt = context.getTMTContext(); - for (auto && [_, r] : tmt.getRegionTable().getRegionsByTable(table_info.id)) + for (auto && [_, r] : tmt.getRegionTable().getRegionsByTable(NullspaceID, table_info.id)) { std::ignore = _; if (r == nullptr) diff --git a/dbms/src/Flash/BatchCoprocessorHandler.cpp b/dbms/src/Flash/BatchCoprocessorHandler.cpp index da5d08f3c45..0cf10910af1 100644 --- a/dbms/src/Flash/BatchCoprocessorHandler.cpp +++ b/dbms/src/Flash/BatchCoprocessorHandler.cpp @@ -73,6 +73,7 @@ grpc::Status BatchCoprocessorHandler::execute() DAGContext dag_context( dag_request, std::move(tables_regions_info), + cop_request->context().keyspace_id(), cop_context.db_context.getClientInfo().current_address.toString(), /*is_batch_cop=*/true, Logger::get("BatchCoprocessorHandler")); diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index a13416c436e..1ca227f6a08 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -36,7 +36,7 @@ bool strictSqlMode(UInt64 sql_mode) } // for non-mpp(cop/batchCop) -DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_) +DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_) : dag_request(&dag_request_) , dummy_query_string(dag_request->DebugString()) , dummy_ast(makeDummyQuery()) @@ -52,6 +52,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo , max_recorded_error_count(getMaxErrorCount(*dag_request)) , warnings(max_recorded_error_count) , warning_count(0) + , keyspace_id(keyspace_id_) { RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor()); const auto & root_executor = dag_request->has_root_executor() @@ -79,6 +80,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet , max_recorded_error_count(getMaxErrorCount(*dag_request)) , warnings(max_recorded_error_count) , warning_count(0) + , keyspace_id(meta_.keyspace_id()) { RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id()); root_executor_id = dag_request->root_executor().executor_id(); diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 7dba23447c9..6284f2fbd81 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -128,7 +128,7 @@ class DAGContext { public: // for non-mpp(cop/batchCop) - DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_); + DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_); // for mpp DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_); @@ -272,6 +272,7 @@ class DAGContext void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); } + KeyspaceID getKeyspaceID() const { return keyspace_id; } String getRootExecutorId(); const tipb::DAGRequest * dag_request; @@ -362,6 +363,9 @@ class DAGContext // In disaggregated tiflash mode, table_scan in tiflash_compute node will be converted ExchangeReceiver. // Record here so we can add to receiver_set and cancel/close it. std::optional> disaggregated_compute_exchange_receiver; + + // The keyspace that the DAG request from + const KeyspaceID keyspace_id = NullspaceID; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 66755e8094a..d3a7ab3b452 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -471,7 +471,7 @@ std::vector DAGStorageInterpreter::buildCopTasks( std::multimap meta_data; meta_data.emplace("is_remote_read", "true"); - auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] { + auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, dagContext().getKeyspaceID(), &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] { GET_METRIC(tiflash_coprocessor_request_count, type_remote_read_sent).Increment(); }); all_tasks.insert(all_tasks.end(), tasks.begin(), tasks.end()); @@ -842,10 +842,11 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max std::unordered_map DAGStorageInterpreter::getAndLockStorages(Int64 query_schema_version) { + auto keyspace_id = context.getDAGContext()->getKeyspaceID(); std::unordered_map storages_with_lock; if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)) { - auto logical_table_storage = tmt.getStorages().get(logical_table_id); + auto logical_table_storage = tmt.getStorages().get(keyspace_id, logical_table_id); if (!logical_table_storage) { throw TiFlashException(fmt::format("Table {} doesn't exist.", logical_table_id), Errors::Table::NotExists); @@ -855,7 +856,7 @@ std::unordered_map DAG { for (auto const physical_table_id : table_scan.getPhysicalTableIDs()) { - auto physical_table_storage = tmt.getStorages().get(physical_table_id); + auto physical_table_storage = tmt.getStorages().get(keyspace_id, physical_table_id); if (!physical_table_storage) { throw TiFlashException(fmt::format("Table {} doesn't exist.", physical_table_id), Errors::Table::NotExists); @@ -866,14 +867,14 @@ std::unordered_map DAG return storages_with_lock; } - auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion(); + auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion(keyspace_id); /// Align schema version under the read lock. /// Return: [storage, table_structure_lock, storage_schema_version, ok] auto get_and_lock_storage = [&](bool schema_synced, TableID table_id) -> std::tuple { /// Get storage in case it's dropped then re-created. // If schema synced, call getTable without try, leading to exception on table not existing. - auto table_store = tmt.getStorages().get(table_id); + auto table_store = tmt.getStorages().get(keyspace_id, table_id); if (!table_store) { if (schema_synced) @@ -968,7 +969,7 @@ std::unordered_map DAG auto sync_schema = [&] { auto start_time = Clock::now(); GET_METRIC(tiflash_schema_trigger_count, type_cop_read).Increment(); - tmt.getSchemaSyncer()->syncSchemas(context); + tmt.getSchemaSyncer()->syncSchemas(context, dagContext().getKeyspaceID()); auto schema_sync_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); LOG_INFO(log, "Table {} schema sync cost {}ms.", logical_table_id, schema_sync_cost); }; diff --git a/dbms/src/Flash/CoprocessorHandler.cpp b/dbms/src/Flash/CoprocessorHandler.cpp index 29454fc428f..9533a11ff34 100644 --- a/dbms/src/Flash/CoprocessorHandler.cpp +++ b/dbms/src/Flash/CoprocessorHandler.cpp @@ -110,6 +110,7 @@ grpc::Status CoprocessorHandler::execute() DAGContext dag_context( dag_request, std::move(tables_regions_info), + cop_request->context().keyspace_id(), cop_context.db_context.getClientInfo().current_address.toString(), /*is_batch_cop=*/false, Logger::get("CoprocessorHandler")); diff --git a/dbms/src/Flash/Management/ManualCompact.cpp b/dbms/src/Flash/Management/ManualCompact.cpp index c79bebdcc2c..006e53ebd8e 100644 --- a/dbms/src/Flash/Management/ManualCompact.cpp +++ b/dbms/src/Flash/Management/ManualCompact.cpp @@ -38,11 +38,12 @@ ManualCompactManager::ManualCompactManager(const Context & global_context_, cons grpc::Status ManualCompactManager::handleRequest(const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response) { + auto ks_tbl_id = KeyspaceTableID{request->keyspace_id(), request->logical_table_id()}; { std::lock_guard lock(mutex); // Check whether there are duplicated executions. - if (unsync_active_logical_table_ids.count(request->logical_table_id())) + if (unsync_active_logical_table_ids.count(ks_tbl_id)) { response->mutable_error()->mutable_err_compact_in_progress(); response->set_has_remaining(false); @@ -57,12 +58,12 @@ grpc::Status ManualCompactManager::handleRequest(const ::kvrpcpb::CompactRequest return grpc::Status::OK; } - unsync_active_logical_table_ids.insert(request->logical_table_id()); + unsync_active_logical_table_ids.insert(ks_tbl_id); unsync_running_or_pending_tasks++; } SCOPE_EXIT({ std::lock_guard lock(mutex); - unsync_active_logical_table_ids.erase(request->logical_table_id()); + unsync_active_logical_table_ids.erase(ks_tbl_id); unsync_running_or_pending_tasks--; }); @@ -100,7 +101,7 @@ grpc::Status ManualCompactManager::doWorkWithCatch(const ::kvrpcpb::CompactReque grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response) { const auto & tmt_context = global_context.getTMTContext(); - auto storage = tmt_context.getStorages().get(request->physical_table_id()); + auto storage = tmt_context.getStorages().get(request->keyspace_id(), request->physical_table_id()); if (storage == nullptr) { response->mutable_error()->mutable_err_physical_table_not_exist(); @@ -162,6 +163,7 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ Stopwatch timer; + // TODO(iosmanthus): attach keyspace id for this logger. LOG_INFO(log, "Manual compaction begin for table {}, start_key = {}", request->physical_table_id(), start_key.toDebugString()); // Repeatedly merge multiple segments as much as possible. diff --git a/dbms/src/Flash/Management/ManualCompact.h b/dbms/src/Flash/Management/ManualCompact.h index 14d8abe8837..b415705389c 100644 --- a/dbms/src/Flash/Management/ManualCompact.h +++ b/dbms/src/Flash/Management/ManualCompact.h @@ -15,6 +15,7 @@ #include #include +#include #include #include #pragma GCC diagnostic push @@ -78,7 +79,7 @@ class ManualCompactManager : private boost::noncopyable /// When there is a task containing the same logical_table running, /// the task will be rejected. - std::set unsync_active_logical_table_ids = {}; + std::unordered_set> unsync_active_logical_table_ids = {}; size_t unsync_running_or_pending_tasks = 0; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 44e23e8ee43..48cf4007d95 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -225,7 +225,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d const String qualified_name = database_name + "." + table_name; /// Get current schema version in schema syncer for a chance to shortcut. - const auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion(); + const auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion(NullspaceID); /// Lambda for get storage, then align schema version under the read lock. auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { @@ -296,7 +296,10 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d { log_schema_version("not OK, syncing schemas."); auto start_time = Clock::now(); - context.getTMTContext().getSchemaSyncer()->syncSchemas(context); + // Since InterpreterSelectQuery will only be trigger while using ClickHouse client, + // and we do not support keyspace feature for ClickHouse interface, + // we could use nullspace id here safely. + context.getTMTContext().getSchemaSyncer()->syncSchemas(context, NullspaceID); auto schema_sync_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); LOG_DEBUG(log, "Table {} schema sync cost {}ms.", qualified_name, schema_sync_cost); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 46f9ccf7ef7..a2a69bfab1a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -309,7 +309,7 @@ struct TiFlashProxyConfig const std::string TiFlashProxyConfig::config_prefix = "flash.proxy"; -pingcap::ClusterConfig getClusterConfig(TiFlashSecurityConfigPtr security_config, const TiFlashRaftConfig & raft_config, const LoggerPtr & log) +pingcap::ClusterConfig getClusterConfig(TiFlashSecurityConfigPtr security_config, const TiFlashRaftConfig & raft_config, const int api_version, const LoggerPtr & log) { pingcap::ClusterConfig config; config.tiflash_engine_key = raft_config.engine_key; @@ -318,7 +318,18 @@ pingcap::ClusterConfig getClusterConfig(TiFlashSecurityConfigPtr security_config config.ca_path = ca_path; config.cert_path = cert_path; config.key_path = key_path; - LOG_INFO(log, "update cluster config, ca_path: {}, cert_path: {}, key_path: {}", ca_path, cert_path, key_path); + switch (api_version) + { + case 1: + config.api_version = kvrpcpb::APIVersion::V1; + break; + case 2: + config.api_version = kvrpcpb::APIVersion::V2; + break; + default: + throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Invalid api version {}", api_version); + } + LOG_INFO(log, "update cluster config, ca_path: {}, cert_path: {}, key_path: {}, api_version: {}", ca_path, cert_path, key_path, config.api_version); return config; } @@ -887,6 +898,8 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Using format_version={} (default settings).", STORAGE_FORMAT_CURRENT.identifier); } + LOG_INFO(log, "Using api_version={}", storage_config.api_version); + // Init Proxy's config TiFlashProxyConfig proxy_conf(config()); EngineStoreServerWrap tiflash_instance_wrap{}; @@ -1197,7 +1210,7 @@ int Server::main(const std::vector & /*args*/) if (updated) { auto raft_config = TiFlashRaftConfig::parseSettings(*config, log); - auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), raft_config, log); + auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), raft_config, storage_config.api_version, log); global_context->getTMTContext().updateSecurityConfig(std::move(raft_config), std::move(cluster_config)); LOG_DEBUG(log, "TMTContext updated security config"); } @@ -1254,7 +1267,7 @@ int Server::main(const std::vector & /*args*/) { /// create TMTContext - auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), raft_config, log); + auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), raft_config, storage_config.api_version, log); global_context->createTMTContext(raft_config, std::move(cluster_config)); global_context->getTMTContext().reloadConfig(config()); } @@ -1276,26 +1289,30 @@ int Server::main(const std::vector & /*args*/) if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) { /// Then, sync schemas with TiDB, and initialize schema sync service. - for (int i = 0; i < 60; i++) // retry for 3 mins + /// If in API V2 mode, each keyspace's schema is fetch lazily. + if (storage_config.api_version == 1) { - try + for (int i = 0; i < 60; i++) // retry for 3 mins { - global_context->getTMTContext().getSchemaSyncer()->syncSchemas(*global_context); - break; - } - catch (Poco::Exception & e) - { - const int wait_seconds = 3; - LOG_ERROR( - log, - "Bootstrap failed because sync schema error: {}\nWe will sleep for {}" - " seconds and try again.", - e.displayText(), - wait_seconds); - ::sleep(wait_seconds); + try + { + global_context->getTMTContext().getSchemaSyncer()->syncSchemas(*global_context, NullspaceID); + break; + } + catch (Poco::Exception & e) + { + const int wait_seconds = 3; + LOG_ERROR( + log, + "Bootstrap failed because sync schema error: {}\nWe will sleep for {}" + " seconds and try again.", + e.displayText(), + wait_seconds); + ::sleep(wait_seconds); + } } + LOG_DEBUG(log, "Sync schemas done."); } - LOG_DEBUG(log, "Sync schemas done."); initStores(*global_context, log, storage_config.lazily_init_store); diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index ad90a48db66..ef4fff41c92 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -210,6 +210,11 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, const Logge format_version = *version; } + if (auto version = table->get_qualified_as("api_version"); version) + { + api_version = *version; + } + auto get_bool_config_or_default = [&](const String & name, bool default_value) { if (auto value = table->get_qualified_as(name); value) { diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h index 70740891f58..0f94bde6bcf 100644 --- a/dbms/src/Server/StorageConfigParser.h +++ b/dbms/src/Server/StorageConfigParser.h @@ -137,6 +137,7 @@ struct TiFlashStorageConfig UInt64 format_version = 0; bool lazily_init_store = true; + UInt64 api_version = 1; StorageS3Config s3_config; StorageRemoteCacheConfig remote_cache_config; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 623c687e258..f909f80778e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -442,8 +442,14 @@ class DeltaMergeStore : private boost::noncopyable StoreStats getStoreStats(); SegmentsStats getSegmentsStats(); - bool isCommonHandle() const { return is_common_handle; } - size_t getRowKeyColumnSize() const { return rowkey_column_size; } + bool isCommonHandle() const + { + return is_common_handle; + } + size_t getRowKeyColumnSize() const + { + return rowkey_column_size; + } public: /// Methods mainly used by region split. diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRange.cpp b/dbms/src/Storages/DeltaMerge/RowKeyRange.cpp index 9e2ea7ffbad..d7de4b701c8 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRange.cpp +++ b/dbms/src/Storages/DeltaMerge/RowKeyRange.cpp @@ -55,18 +55,19 @@ RowKeyValue RowKeyValueRef::toRowKeyValue() const } } -std::unordered_map RowKeyRange::table_min_max_data; +std::unordered_map> RowKeyRange::table_min_max_data; std::shared_mutex RowKeyRange::table_mutex; -const RowKeyRange::TableRangeMinMax & RowKeyRange::getTableMinMaxData(TableID table_id, bool is_common_handle) +const RowKeyRange::TableRangeMinMax & RowKeyRange::getTableMinMaxData(KeyspaceID keyspace_id, TableID table_id, bool is_common_handle) { + auto keyspace_table_id = KeyspaceTableID{keyspace_id, table_id}; { std::shared_lock lock(table_mutex); - if (auto it = table_min_max_data.find(table_id); it != table_min_max_data.end()) + if (auto it = table_min_max_data.find(keyspace_table_id); it != table_min_max_data.end()) return it->second; } std::unique_lock lock(table_mutex); - return table_min_max_data.try_emplace(table_id, table_id, is_common_handle).first->second; + return table_min_max_data.try_emplace(keyspace_table_id, keyspace_id, table_id, is_common_handle).first->second; } template diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRange.h b/dbms/src/Storages/DeltaMerge/RowKeyRange.h index 3213bc3316e..46e2024565d 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRange.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRange.h @@ -416,9 +416,11 @@ struct RowKeyRange HandleValuePtr min; HandleValuePtr max; - TableRangeMinMax(TableID table_id, bool is_common_handle) + TableRangeMinMax(KeyspaceID keyspace_id, TableID table_id, bool is_common_handle) { WriteBufferFromOwnString ss; + auto ks_pfx = DecodedTiKVKey::makeKeyspacePrefix(keyspace_id); + ss.write(ks_pfx.data(), ks_pfx.size()); ss.write('t'); EncodeInt64(table_id, ss); ss.write('_'); @@ -438,9 +440,9 @@ struct RowKeyRange }; /// maybe use a LRU cache in case there are massive tables - static std::unordered_map table_min_max_data; + static std::unordered_map> table_min_max_data; static std::shared_mutex table_mutex; - static const TableRangeMinMax & getTableMinMaxData(TableID table_id, bool is_common_handle); + static const TableRangeMinMax & getTableMinMaxData(KeyspaceID keyspace_id, TableID table_id, bool is_common_handle); RowKeyRange(const RowKeyValue & start_, const RowKeyValue & end_, bool is_common_handle_, size_t rowkey_column_size_) : is_common_handle(is_common_handle_) @@ -753,7 +755,8 @@ struct RowKeyRange { auto & start_key = *raw_keys.first; auto & end_key = *raw_keys.second; - const auto & table_range_min_max = getTableMinMaxData(table_id, is_common_handle); + auto keyspace_id = start_key.getKeyspaceID(); + const auto & table_range_min_max = getTableMinMaxData(keyspace_id, table_id, is_common_handle); RowKeyValue start_value, end_value; if (start_key.compare(*table_range_min_max.min) <= 0) { @@ -765,7 +768,7 @@ struct RowKeyRange else { start_value = RowKeyValue(is_common_handle, - std::make_shared(start_key.begin() + RecordKVFormat::RAW_KEY_NO_HANDLE_SIZE, start_key.end())); + std::make_shared(RecordKVFormat::getRawTiDBPKView(start_key))); } if (end_key.compare(*table_range_min_max.max) >= 0) { @@ -776,7 +779,7 @@ struct RowKeyRange } else end_value = RowKeyValue(is_common_handle, - std::make_shared(end_key.begin() + RecordKVFormat::RAW_KEY_NO_HANDLE_SIZE, end_key.end())); + std::make_shared(RecordKVFormat::getRawTiDBPKView(end_key))); return RowKeyRange(start_value, end_value, is_common_handle, rowkey_column_size); } else diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index 640c2505939..8ee4b7b0129 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -44,15 +44,15 @@ bool GCManager::work() return false; } - LOG_DEBUG(log, "Start GC with table id: {}", next_table_id); + LOG_DEBUG(log, "Start GC with keyspace id: {}, table id: {}", next_keyspace_table_id.first, next_keyspace_table_id.second); // Get a storage snapshot with weak_ptrs first // TODO: avoid gc on storage which have no data? - std::map> storages; - for (const auto & [table_id, storage] : global_context.getTMTContext().getStorages().getAllStorage()) - storages.emplace(table_id, storage); + std::map> storages; + for (const auto & [ks_tbl_id, storage] : global_context.getTMTContext().getStorages().getAllStorage()) + storages.emplace(ks_tbl_id, storage); auto iter = storages.begin(); - if (next_table_id != InvalidTableID) - iter = storages.lower_bound(next_table_id); + if (next_keyspace_table_id != KeyspaceTableID{NullspaceID, InvalidTableID}) + iter = storages.lower_bound(next_keyspace_table_id); UInt64 checked_storage_num = 0; while (true) @@ -74,13 +74,15 @@ bool GCManager::work() try { + auto keyspace_id = storage->getTableInfo().keyspace_id; + auto ks_log = log->getChild(fmt::format("keyspace={}", keyspace_id)); TableLockHolder table_read_lock = storage->lockForShare(RWLock::NO_QUERY); // Block this thread and do GC on the storage - // It is OK if any schema changes is apply to the storage while doing GC, so we + // It is OK if any schema changes is applied to the storage while doing GC, so we // do not acquire structure lock on the storage. auto gc_segments_num = storage->onSyncGc(gc_segments_limit, DM::GCOptions::newAll()); gc_segments_limit = gc_segments_limit - gc_segments_num; - LOG_TRACE(log, "GCManager gc {} segments of table {}", gc_segments_num, storage->getTableInfo().id); + LOG_TRACE(ks_log, "GCManager gc {} segments of table {}", gc_segments_num, storage->getTableInfo().id); // Reach the limit on the number of segments to be gc, stop here if (gc_segments_limit <= 0) break; @@ -98,8 +100,10 @@ bool GCManager::work() } if (iter == storages.end()) iter = storages.begin(); - next_table_id = iter->first; - LOG_DEBUG(log, "End GC and next gc will start with table id: {}", next_table_id); + + if (iter != storages.end()) + next_keyspace_table_id = iter->first; + LOG_DEBUG(log, "End GC and next gc will start with keyspace {}, table id: {}", next_keyspace_table_id.first, next_keyspace_table_id.second); gc_check_stop_watch.restart(); // Always return false return false; diff --git a/dbms/src/Storages/GCManager.h b/dbms/src/Storages/GCManager.h index d2f4f164978..e2522daec99 100644 --- a/dbms/src/Storages/GCManager.h +++ b/dbms/src/Storages/GCManager.h @@ -35,7 +35,7 @@ class GCManager private: Context & global_context; - TableID next_table_id = InvalidTableID; + KeyspaceTableID next_keyspace_table_id = KeyspaceTableID{NullspaceID, InvalidTableID}; AtomicStopwatch gc_check_stop_watch; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 5dd4a406ddc..5f245516973 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1633,8 +1633,10 @@ void StorageDeltaMerge::removeFromTMTContext() { // remove this table from TMTContext TMTContext & tmt_context = global_context.getTMTContext(); - tmt_context.getStorages().remove(tidb_table_info.id); - tmt_context.getRegionTable().removeTable(tidb_table_info.id); + auto keyspace_id = tidb_table_info.keyspace_id; + auto table_id = tidb_table_info.id; + tmt_context.getStorages().remove(keyspace_id, table_id); + tmt_context.getRegionTable().removeTable(keyspace_id, table_id); } StorageDeltaMerge::~StorageDeltaMerge() diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 8373ec03c6a..4b3faac7e49 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -131,6 +131,8 @@ StorageDisaggregated::RequestAndRegionIDs StorageDisaggregated::buildDispatchMPP { auto dispatch_req = std::make_shared<::mpp::DispatchTaskRequest>(); ::mpp::TaskMeta * dispatch_req_meta = dispatch_req->mutable_meta(); + // TODO(iosmanthus): support S3 remote read in keyspace mode. + dispatch_req_meta->set_keyspace_id(context.getDAGContext()->getKeyspaceID()); dispatch_req_meta->set_start_ts(sender_target_mpp_task_id.query_id.start_ts); dispatch_req_meta->set_query_ts(sender_target_mpp_task_id.query_id.query_ts); dispatch_req_meta->set_local_query_id(sender_target_mpp_task_id.query_id.local_query_id); diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 3591e49900c..6d7d88a1426 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -119,7 +119,8 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region, { auto table_id = new_region->getMappedTableID(); - if (auto storage = tmt.getStorages().get(table_id); storage) + auto keyspace_id = new_region->getKeyspaceID(); + if (auto storage = tmt.getStorages().get(keyspace_id, table_id); storage) { switch (storage->engineType()) { @@ -142,10 +143,11 @@ template void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_region, UInt64 old_region_index, TMTContext & tmt) { RegionID region_id = new_region_wrap->id(); + auto keyspace_id = new_region_wrap->getKeyspaceID(); { auto table_id = new_region_wrap->getMappedTableID(); - if (auto storage = tmt.getStorages().get(table_id); storage && storage->engineType() == TiDB::StorageEngine::DT) + if (auto storage = tmt.getStorages().get(keyspace_id, table_id); storage && storage->engineType() == TiDB::StorageEngine::DT) { try { @@ -292,6 +294,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( TMTContext & tmt) { auto context = tmt.getContext(); + auto keyspace_id = new_region->getKeyspaceID(); bool force_decode = false; size_t expected_block_size = DEFAULT_MERGE_BLOCK_SIZE; @@ -383,7 +386,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( // Update schema and try to decode again LOG_INFO(log, "Decoding Region snapshot data meet error, sync schema and try to decode again {} [error={}]", new_region->toString(true), e.displayText()); GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment(); - tmt.getSchemaSyncer()->syncSchemas(context); + tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id); // Next time should force_decode force_decode = true; @@ -557,7 +560,8 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi if (!external_files.empty()) { auto table_id = region->getMappedTableID(); - if (auto storage = tmt.getStorages().get(table_id); storage) + auto keyspace_id = region->getKeyspaceID(); + if (auto storage = tmt.getStorages().get(keyspace_id, table_id); storage) { // Ingest DTFiles into DeltaMerge storage auto & context = tmt.getContext(); diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 400f56272c4..0d5280ec5d8 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -136,8 +136,9 @@ void KVStore::traverseRegions(std::function & bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, const LoggerPtr & log, bool try_until_succeed) { fiu_do_on(FailPoints::force_fail_in_flush_region_data, { return false; }); + auto keyspace_id = region.getKeyspaceID(); auto table_id = region.getMappedTableID(); - auto storage = tmt.getStorages().get(table_id); + auto storage = tmt.getStorages().get(keyspace_id, table_id); if (unlikely(storage == nullptr)) { LOG_WARNING(log, diff --git a/dbms/src/Storages/Transaction/KeyspaceSnapshot.cpp b/dbms/src/Storages/Transaction/KeyspaceSnapshot.cpp new file mode 100644 index 00000000000..4e83ea9a60c --- /dev/null +++ b/dbms/src/Storages/Transaction/KeyspaceSnapshot.cpp @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +KeyspaceSnapshot::KeyspaceSnapshot(KeyspaceID keyspace_id_, pingcap::kv::Cluster * cluster_, UInt64 version_) + : snap(cluster_, version_) +{ + if (keyspace_id_ == NullspaceID) + return; + prefix = std::string(KEYSPACE_PREFIX_LEN, 0); + auto id = toBigEndian(keyspace_id_); + memcpy(prefix.data(), reinterpret_cast(&id), sizeof(KeyspaceID)); + prefix[0] = DB::TXN_MODE_PREFIX; +} + +std::string KeyspaceSnapshot::Get(const std::string & key) +{ + auto encoded_key = encodeKey(key); + return snap.Get(encoded_key); +} + +std::string KeyspaceSnapshot::Get(pingcap::kv::Backoffer & bo, const std::string & key) +{ + auto encoded_key = encodeKey(key); + return snap.Get(bo, encoded_key); +} + +KeyspaceScanner KeyspaceSnapshot::Scan(const std::string & begin, const std::string & end) +{ + auto inner = snap.Scan(encodeKey(begin), encodeKey(end)); + return KeyspaceScanner(inner, /* need_cut_ */ !prefix.empty()); +} + +std::string KeyspaceSnapshot::encodeKey(const std::string & key) +{ + return prefix.empty() ? key : prefix + key; +} + +std::string KeyspaceScanner::key() +{ + auto k = Base::key(); + if (need_cut) + k = k.substr(DB::KEYSPACE_PREFIX_LEN); + return k; +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/KeyspaceSnapshot.h b/dbms/src/Storages/Transaction/KeyspaceSnapshot.h new file mode 100644 index 00000000000..c7580a8e177 --- /dev/null +++ b/dbms/src/Storages/Transaction/KeyspaceSnapshot.h @@ -0,0 +1,63 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" +#ifdef __clang__ +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#endif +#include +#pragma GCC diagnostic pop + +#include +#include +#include + +namespace DB +{ +struct KeyspaceScanner : public pingcap::kv::Scanner +{ + using Base = pingcap::kv::Scanner; + + KeyspaceScanner(Base scanner_, bool need_cut_) + : Base(scanner_) + , need_cut(need_cut_) + { + } + + std::string key(); + +private: + bool need_cut; +}; + +class KeyspaceSnapshot +{ +public: + using Base = pingcap::kv::Snapshot; + explicit KeyspaceSnapshot(KeyspaceID keyspace_id_, pingcap::kv::Cluster * cluster_, UInt64 version_); + + std::string Get(const std::string & key); + std::string Get(pingcap::kv::Backoffer & bo, const std::string & key); + KeyspaceScanner Scan(const std::string & begin, const std::string & end); + +private: + Base snap; + std::string prefix; + std::string encodeKey(const std::string & key); +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index 61fd32fa070..2118a67f608 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -107,7 +107,7 @@ class MvccQueryInfoWrap regions_info_ptr = &*regions_info; // Only for test, because regions_query_info should never be empty if query is from TiDB or TiSpark. // todo support partition table - auto regions = tmt.getRegionTable().getRegionsByTable(logical_table_id); + auto regions = tmt.getRegionTable().getRegionsByTable(NullspaceID, logical_table_id); regions_info_ptr->reserve(regions.size()); for (const auto & [id, region] : regions) { diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 6bdf092a00d..d68e4b15e90 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -60,13 +60,14 @@ static void writeRegionDataToStorage( { constexpr auto FUNCTION_NAME = __FUNCTION__; // NOLINT(readability-identifier-naming) const auto & tmt = context.getTMTContext(); + auto keyspace_id = region->getKeyspaceID(); TableID table_id = region->getMappedTableID(); UInt64 region_decode_cost = -1, write_part_cost = -1; /// Declare lambda of atomic read then write to call multiple times. auto atomic_read_write = [&](bool force_decode) { /// Get storage based on table ID. - auto storage = tmt.getStorages().get(table_id); + auto storage = tmt.getStorages().get(keyspace_id, table_id); if (storage == nullptr || storage->isTombstone()) { if (!force_decode) // Need to update. @@ -186,7 +187,7 @@ static void writeRegionDataToStorage( /// If first try failed, sync schema and force read then write. { GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment(); - tmt.getSchemaSyncer()->syncSchemas(context); + tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id); if (!atomic_read_write(true)) { @@ -402,11 +403,12 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) std::shared_ptr dm_storage; DecodingStorageSchemaSnapshotConstPtr schema_snapshot; + auto keyspace_id = region->getKeyspaceID(); auto table_id = region->getMappedTableID(); LOG_DEBUG(Logger::get(__PRETTY_FUNCTION__), "Get schema for table {}", table_id); auto context = tmt.getContext(); const auto atomic_get = [&](bool force_decode) -> bool { - auto storage = tmt.getStorages().get(table_id); + auto storage = tmt.getStorages().get(keyspace_id, table_id); if (storage == nullptr) { if (!force_decode) @@ -427,7 +429,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) if (!atomic_get(false)) { GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment(); - tmt.getSchemaSyncer()->syncSchemas(context); + tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id); if (!atomic_get(true)) throw Exception("Get " + region->toString() + " belonging table " + DB::toString(table_id) + " is_command_handle fail", diff --git a/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp b/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp index f3551bed94e..85b7f5c078c 100644 --- a/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace DB { HttpRequestRes HandleHttpRequestSyncStatus( @@ -31,11 +33,36 @@ HttpRequestRes HandleHttpRequestSyncStatus( { HttpRequestStatus status = HttpRequestStatus::Ok; TableID table_id = 0; + pingcap::pd::KeyspaceID keyspace_id = NullspaceID; { - std::string table_id_str(path.substr(api_name.size())); + auto * log = &Poco::Logger::get("HandleHttpRequestSyncStatus"); + LOG_DEBUG(log, "handling sync status request, path: {}, api_name: {}", path, api_name); + + // Try to handle sync status request with old schema. + // Old schema: /{table_id} + // New schema: /keyspace/{keyspace_id}/table/{table_id} + auto query = path.substr(api_name.size()); + std::vector query_parts; + boost::split(query_parts, query, boost::is_any_of("/")); + if (query_parts.size() != 1 && (query_parts.size() != 4 || query_parts[0] != "keyspace" || query_parts[2] != "table")) + { + LOG_ERROR(log, "invalid SyncStatus request: {}", query); + status = HttpRequestStatus::ErrorParam; + return HttpRequestRes{.status = status, .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{}}}; + } + + try { - table_id = std::stoll(table_id_str); + if (query_parts.size() == 4) + { + keyspace_id = std::stoll(query_parts[1]); + table_id = std::stoll(query_parts[3]); + } + else + { + table_id = std::stoll(query_parts[0]); + } } catch (...) { @@ -57,10 +84,11 @@ HttpRequestRes HandleHttpRequestSyncStatus( static const std::chrono::minutes PRINT_LOG_INTERVAL = std::chrono::minutes{5}; static Timepoint last_print_log_time = Clock::now(); // if storage is not created in ch, flash replica should not be available. - if (tmt.getStorages().get(table_id)) + // TODO(iosmanthus): TiDB should support tiflash replica. + if (tmt.getStorages().get(keyspace_id, table_id)) { RegionTable & region_table = tmt.getRegionTable(); - region_table.handleInternalRegionsByTable(table_id, [&](const RegionTable::InternalRegions & regions) { + region_table.handleInternalRegionsByTable(keyspace_id, table_id, [&](const RegionTable::InternalRegions & regions) { region_list.reserve(regions.size()); bool can_log = Clock::now() > last_print_log_time + PRINT_LOG_INTERVAL; FmtBuffer lag_regions_log; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 56134925ba6..7fee344df79 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -778,6 +778,7 @@ Region::Region(DB::RegionMeta && meta_, const TiFlashRaftProxyHelper * proxy_hel : meta(std::move(meta_)) , log(Logger::get()) , mapped_table_id(meta.getRange()->getMappedTableID()) + , keyspace_id(meta.getRange()->getKeyspaceID()) , proxy_helper(proxy_helper_) {} @@ -786,6 +787,11 @@ TableID Region::getMappedTableID() const return mapped_table_id; } +KeyspaceID Region::getKeyspaceID() const +{ + return keyspace_id; +} + void Region::setPeerState(raft_serverpb::PeerState state) { meta.setPeerState(state); diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index f4aead264c9..869a2ca44fe 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -193,6 +193,7 @@ class Region : public std::enable_shared_from_this raft_serverpb::MergeState getMergeState() const; TableID getMappedTableID() const; + KeyspaceID getKeyspaceID() const; EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 index, UInt64 term, TMTContext & tmt); void finishIngestSSTByDTFile(RegionPtr && rhs, UInt64 index, UInt64 term); @@ -231,6 +232,7 @@ class Region : public std::enable_shared_from_this LoggerPtr log; const TableID mapped_table_id; + const KeyspaceID keyspace_id; std::atomic snapshot_event_flag{1}; const TiFlashRaftProxyHelper * proxy_helper{nullptr}; diff --git a/dbms/src/Storages/Transaction/RegionRangeKeys.h b/dbms/src/Storages/Transaction/RegionRangeKeys.h index 4e3c39d3765..7f695ed8114 100644 --- a/dbms/src/Storages/Transaction/RegionRangeKeys.h +++ b/dbms/src/Storages/Transaction/RegionRangeKeys.h @@ -56,11 +56,13 @@ class RegionRangeKeys : boost::noncopyable const std::pair & rawKeys() const; explicit RegionRangeKeys(TiKVKey && start_key, TiKVKey && end_key); TableID getMappedTableID() const; + KeyspaceID getKeyspaceID() const; private: RegionRange ori; std::pair raw; - TableID mapped_table_id; + TableID mapped_table_id = InvalidTableID; + KeyspaceID keyspace_id = NullspaceID; }; } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionState.cpp b/dbms/src/Storages/Transaction/RegionState.cpp index cae75518709..bcc5444b5bd 100644 --- a/dbms/src/Storages/Transaction/RegionState.cpp +++ b/dbms/src/Storages/Transaction/RegionState.cpp @@ -122,11 +122,12 @@ raft_serverpb::MergeState & RegionState::getMutMergeState() bool computeMappedTableID(const DecodedTiKVKey & key, TableID & table_id) { + auto k = key.getUserKey(); // t table_id _r - if (key.size() >= (1 + 8 + 2) && key[0] == RecordKVFormat::TABLE_PREFIX - && memcmp(key.data() + 9, RecordKVFormat::RECORD_PREFIX_SEP, 2) == 0) + if (k.size() >= (1 + 8 + 2) && k[0] == RecordKVFormat::TABLE_PREFIX + && memcmp(k.data() + 9, RecordKVFormat::RECORD_PREFIX_SEP, 2) == 0) { - table_id = RecordKVFormat::getTableId(key); + table_id = RecordKVFormat::getTableId(k); return true; } @@ -138,6 +139,7 @@ RegionRangeKeys::RegionRangeKeys(TiKVKey && start_key, TiKVKey && end_key) , raw(std::make_shared(ori.first.key.empty() ? DecodedTiKVKey() : RecordKVFormat::decodeTiKVKey(ori.first.key)), std::make_shared(ori.second.key.empty() ? DecodedTiKVKey() : RecordKVFormat::decodeTiKVKey(ori.second.key))) { + keyspace_id = raw.first->getKeyspaceID(); if (!computeMappedTableID(*raw.first, mapped_table_id) || ori.first.compare(ori.second) >= 0) { throw Exception("Illegal region range, should not happen, start key: " + ori.first.key.toDebugString() @@ -151,6 +153,11 @@ TableID RegionRangeKeys::getMappedTableID() const return mapped_table_id; } +KeyspaceID RegionRangeKeys::getKeyspaceID() const +{ + return keyspace_id; +} + const std::pair & RegionRangeKeys::rawKeys() const { return raw; diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 4e0cef7ddad..164d1466461 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -35,13 +36,14 @@ extern const int ILLFORMAT_RAFT_ROW; extern const int TABLE_IS_DROPPED; } // namespace ErrorCodes -RegionTable::Table & RegionTable::getOrCreateTable(const TableID table_id) +RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id) { - auto it = tables.find(table_id); + auto ks_tb_id = KeyspaceTableID{keyspace_id, table_id}; + auto it = tables.find(ks_tb_id); if (it == tables.end()) { // Load persisted info. - it = tables.emplace(table_id, table_id).first; + it = tables.emplace(ks_tb_id, table_id).first; LOG_INFO(log, "get new table {}", table_id); } return it->second; @@ -55,6 +57,7 @@ RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Reg RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const RegionID region_id) { + auto keyspace_id = region_range_keys.getKeyspaceID(); auto & table_regions = table.regions; // Insert table mapping. // todo check if region_range_keys.mapped_table_id == table.table_id ?? @@ -65,20 +68,21 @@ RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Reg ErrorCodes::LOGICAL_ERROR); // Insert region mapping. - regions[region_id] = table.table_id; + regions[region_id] = KeyspaceTableID{keyspace_id, table.table_id}; return it->second; } -RegionTable::InternalRegion & RegionTable::doGetInternalRegion(DB::TableID table_id, DB::RegionID region_id) +RegionTable::InternalRegion & RegionTable::doGetInternalRegion(KeyspaceTableID ks_tb_id, DB::RegionID region_id) { - return tables.find(table_id)->second.regions.find(region_id)->second; + return tables.find(ks_tb_id)->second.regions.find(region_id)->second; } RegionTable::InternalRegion & RegionTable::getOrInsertRegion(const Region & region) { + auto keyspace_id = region.getKeyspaceID(); auto table_id = region.getMappedTableID(); - auto & table = getOrCreateTable(table_id); + auto & table = getOrCreateTable(keyspace_id, table_id); auto & table_regions = table.regions; if (auto it = table_regions.find(region.id()); it != table_regions.end()) return it->second; @@ -175,11 +179,11 @@ void RegionTable::restore() LOG_INFO(log, "Restore {} tables", tables.size()); } -void RegionTable::removeTable(TableID table_id) +void RegionTable::removeTable(KeyspaceID keyspace_id, TableID table_id) { std::lock_guard lock(mutex); - auto it = tables.find(table_id); + auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); if (it == tables.end()) return; auto & table = it->second; @@ -215,11 +219,11 @@ namespace /// Note that this function will try to acquire lock by `IStorage->lockForShare` void removeObsoleteDataInStorage( Context * const context, - const TableID table_id, + const KeyspaceTableID ks_table_id, const std::pair & handle_range) { TMTContext & tmt = context->getTMTContext(); - auto storage = tmt.getStorages().get(table_id); + auto storage = tmt.getStorages().get(ks_table_id.first, ks_table_id.second); // For DT only now if (!storage || storage->engineType() != TiDB::StorageEngine::DT) return; @@ -235,7 +239,7 @@ void removeObsoleteDataInStorage( /// Now we assume that these won't block for long time. auto rowkey_range - = DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize()); + = DM::RowKeyRange::fromRegionRange(handle_range, ks_table_id.second, ks_table_id.second, storage->isCommonHandle(), storage->getRowKeyColumnSize()); dm_storage->deleteRange(rowkey_range, context->getSettingsRef()); dm_storage->flushCache(*context, rowkey_range, /*try_until_succeed*/ true); // flush to disk } @@ -250,7 +254,7 @@ void removeObsoleteDataInStorage( void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const RegionTaskLock &) { - TableID table_id = 0; + KeyspaceTableID ks_tb_id; std::pair handle_range; { @@ -264,8 +268,8 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const return; } - table_id = it->second; - auto & table = tables.find(table_id)->second; + ks_tb_id = it->second; + auto & table = tables.find(ks_tb_id)->second; auto internal_region_it = table.regions.find(region_id); handle_range = internal_region_it->second.range_in_table; @@ -277,7 +281,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const table.regions.erase(internal_region_it); if (table.regions.empty()) { - tables.erase(table_id); + tables.erase(ks_tb_id); } LOG_INFO(log, "remove [region {}] in RegionTable done", region_id); } @@ -291,7 +295,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const // But caller(KVStore) should ensure that no new data write into this handle_range // before `removeObsoleteDataInStorage` is done. (by param `RegionTaskLock`) // And this is expected not to block for long time. - removeObsoleteDataInStorage(context, table_id, handle_range); + removeObsoleteDataInStorage(context, ks_tb_id, handle_range); LOG_INFO(log, "remove region [{}] in storage done", region_id); } } @@ -387,8 +391,7 @@ RegionID RegionTable::pickRegionToFlush() auto region_id = *dirty_it; if (auto it = regions.find(region_id); it != regions.end()) { - auto table_id = it->second; - if (shouldFlush(doGetInternalRegion(table_id, region_id))) + if (shouldFlush(doGetInternalRegion(it->second, region_id))) { // The dirty flag should only be removed after data is flush successfully. return region_id; @@ -416,19 +419,19 @@ bool RegionTable::tryFlushRegions() return false; } -void RegionTable::handleInternalRegionsByTable(const TableID table_id, std::function && callback) const +void RegionTable::handleInternalRegionsByTable(const KeyspaceID keyspace_id, const TableID table_id, std::function && callback) const { std::lock_guard lock(mutex); - if (auto it = tables.find(table_id); it != tables.end()) + if (auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); it != tables.end()) callback(it->second.regions); } -std::vector> RegionTable::getRegionsByTable(const TableID table_id) const +std::vector> RegionTable::getRegionsByTable(const KeyspaceID keyspace_id, const TableID table_id) const { auto & kvstore = context->getTMTContext().getKVStore(); std::vector> regions; - handleInternalRegionsByTable(table_id, [&](const InternalRegions & internal_regions) { + handleInternalRegionsByTable(keyspace_id, table_id, [&](const InternalRegions & internal_regions) { for (const auto & region_info : internal_regions) { auto region = kvstore->getRegion(region_info.first); @@ -448,17 +451,25 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK { std::lock_guard lock(mutex); + auto keyspace_id = region_range_keys.getKeyspaceID(); auto table_id = region_range_keys.getMappedTableID(); + auto ks_tbl_id = KeyspaceTableID{keyspace_id, table_id}; auto new_handle_range = region_range_keys.rawKeys(); if (auto it = regions.find(region_id); it != regions.end()) { - if (table_id != it->second) - throw Exception(std::string(__PRETTY_FUNCTION__) + ": table id " + std::to_string(table_id) + " not match previous one " - + std::to_string(it->second) + " in regions " + std::to_string(region_id), - ErrorCodes::LOGICAL_ERROR); - - InternalRegion & internal_region = doGetInternalRegion(table_id, region_id); + RUNTIME_CHECK_MSG( + ks_tbl_id == it->second, + "{}: table id not match the previous one" + ", region_id={} keyspace_id={} table_id={}, old_keyspace_id={} old_table_id={}", + __PRETTY_FUNCTION__, + region_id, + keyspace_id, + table_id, + it->second.first, + it->second.second); + + InternalRegion & internal_region = doGetInternalRegion(ks_tbl_id, region_id); if (*(internal_region.range_in_table.first) <= *(new_handle_range.first) && *(internal_region.range_in_table.second) >= *(new_handle_range.second)) { @@ -476,7 +487,7 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK } else { - auto & table = getOrCreateTable(table_id); + auto & table = getOrCreateTable(keyspace_id, table_id); insertRegion(table, region_range_keys, region_id); LOG_INFO(log, "table {} insert internal region {}", table_id, region_id); } diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 36686b44d90..03f60f1644f 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -96,8 +96,8 @@ class RegionTable : private boost::noncopyable InternalRegions regions; }; - using TableMap = std::unordered_map; - using RegionInfoMap = std::unordered_map; + using TableMap = std::unordered_map>; + using RegionInfoMap = std::unordered_map; // safe ts is maintained by check_leader RPC (https://github.com/tikv/tikv/blob/1ea26a2ac8761af356cc5c0825eb89a0b8fc9749/components/resolved_ts/src/advance.rs#L262), // leader_safe_ts is the safe_ts in leader, leader will send to learner to advance safe_ts of learner, and TiFlash will record the safe_ts into safe_ts_map in check_leader RPC. @@ -161,8 +161,8 @@ class RegionTable : private boost::noncopyable RegionDataReadInfoList tryFlushRegion(RegionID region_id, bool try_persist = false); RegionDataReadInfoList tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist); - void handleInternalRegionsByTable(TableID table_id, std::function && callback) const; - std::vector> getRegionsByTable(TableID table_id) const; + void handleInternalRegionsByTable(KeyspaceID keyspace_id, TableID table_id, std::function && callback) const; + std::vector> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const; /// Write the data of the given region into the table with the given table ID, fill the data list for outer to remove. /// Will trigger schema sync on read error for only once, @@ -202,12 +202,12 @@ class RegionTable : private boost::noncopyable friend class MockTiDB; friend class StorageDeltaMerge; - Table & getOrCreateTable(TableID table_id); - void removeTable(TableID table_id); + Table & getOrCreateTable(KeyspaceID keyspace_id, TableID table_id); + void removeTable(KeyspaceID keyspace_id, TableID table_id); InternalRegion & getOrInsertRegion(const Region & region); InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, RegionID region_id); InternalRegion & insertRegion(Table & table, const Region & region); - InternalRegion & doGetInternalRegion(TableID table_id, RegionID region_id); + InternalRegion & doGetInternalRegion(KeyspaceTableID ks_tb_id, RegionID region_id); RegionDataReadInfoList flushRegion(const RegionPtrWithBlock & region, bool try_persist) const; bool shouldFlush(const InternalRegion & region) const; diff --git a/dbms/src/Storages/Transaction/TMTStorages.cpp b/dbms/src/Storages/Transaction/TMTStorages.cpp index e1b0ac0826d..5340f5cce8b 100644 --- a/dbms/src/Storages/Transaction/TMTStorages.cpp +++ b/dbms/src/Storages/Transaction/TMTStorages.cpp @@ -27,36 +27,46 @@ void ManagedStorages::put(ManageableStoragePtr storage) { std::lock_guard lock(mutex); + KeyspaceID keyspace_id = storage->getTableInfo().keyspace_id; TableID table_id = storage->getTableInfo().id; - if (storages.find(table_id) != storages.end() && table_id != DB::InvalidTableID) + auto keyspace_table_id = KeyspaceTableID{keyspace_id, table_id}; + if (storages.find(keyspace_table_id) != storages.end() && table_id != DB::InvalidTableID) { // If table already exists, and is not created through ch-client (which table_id could be unspecified) // throw Exception throw Exception("TiDB table with id " + DB::toString(table_id) + " already exists.", ErrorCodes::TIDB_TABLE_ALREADY_EXISTS); } - storages.emplace(table_id, storage); + storages.emplace(keyspace_table_id, storage); + auto [it, _] = keyspaces.try_emplace(keyspace_id, 0); + it->second++; } -ManageableStoragePtr ManagedStorages::get(TableID table_id) const +ManageableStoragePtr ManagedStorages::get(KeyspaceID keyspace_id, TableID table_id) const { std::lock_guard lock(mutex); - if (auto it = storages.find(table_id); it != storages.end()) + if (auto it = storages.find(KeyspaceTableID{keyspace_id, table_id}); it != storages.end()) return it->second; return nullptr; } -std::unordered_map ManagedStorages::getAllStorage() const +StorageMap ManagedStorages::getAllStorage() const { std::lock_guard lock(mutex); return storages; } +KeyspaceSet ManagedStorages::getAllKeyspaces() const +{ + std::lock_guard lock(mutex); + return keyspaces; +} + ManageableStoragePtr ManagedStorages::getByName(const std::string & db, const std::string & table, bool include_tombstone) const { std::lock_guard lock(mutex); - auto it = std::find_if(storages.begin(), storages.end(), [&](const std::pair & pair) { + auto it = std::find_if(storages.begin(), storages.end(), [&](const std::pair & pair) { const auto & storage = pair.second; return (include_tombstone || !storage->isTombstone()) && storage->getDatabaseName() == db && storage->getTableInfo().name == table; }); @@ -65,14 +75,17 @@ ManageableStoragePtr ManagedStorages::getByName(const std::string & db, const st return it->second; } -void ManagedStorages::remove(TableID table_id) +void ManagedStorages::remove(KeyspaceID keyspace_id, TableID table_id) { std::lock_guard lock(mutex); - auto it = storages.find(table_id); + auto it = storages.find(KeyspaceTableID{keyspace_id, table_id}); if (it == storages.end()) return; storages.erase(it); + keyspaces[keyspace_id]--; + if (!keyspaces[keyspace_id]) + keyspaces.erase(keyspace_id); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/TMTStorages.h b/dbms/src/Storages/Transaction/TMTStorages.h index 9df36caa999..55c897c715b 100644 --- a/dbms/src/Storages/Transaction/TMTStorages.h +++ b/dbms/src/Storages/Transaction/TMTStorages.h @@ -26,21 +26,28 @@ class IManageableStorage; class StorageDeltaMerge; using StorageDeltaMergePtr = std::shared_ptr; using ManageableStoragePtr = std::shared_ptr; +using StorageMap = std::unordered_map>; +using KeyspaceSet = std::unordered_map; class ManagedStorages : private boost::noncopyable { public: void put(ManageableStoragePtr storage); - ManageableStoragePtr get(TableID table_id) const; - std::unordered_map getAllStorage() const; + // Get storage by keyspace and table id + ManageableStoragePtr get(KeyspaceID keyspace_id, TableID table_id) const; + // Get all the storages of all the keyspaces in this instance. + StorageMap getAllStorage() const; + // Get all the existing keyspaces in this instance. A map of `{KeySpaceID => num of physical tables}`. + KeyspaceSet getAllKeyspaces() const; ManageableStoragePtr getByName(const std::string & db, const std::string & table, bool include_tombstone) const; - void remove(TableID table_id); + void remove(KeyspaceID keyspace_id, TableID table_id); private: - std::unordered_map storages; + StorageMap storages; + KeyspaceSet keyspaces; mutable std::mutex mutex; }; diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index b2f32030a5a..2215e1b0856 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -618,6 +618,7 @@ try Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); json->set("id", id); + json->set("keyspace_id", keyspace_id); Poco::JSON::Object::Ptr name_json = new Poco::JSON::Object(); name_json->set("O", name); name_json->set("L", name); @@ -646,6 +647,10 @@ try Poco::Dynamic::Var result = parser.parse(json_str); auto obj = result.extract(); id = obj->getValue("id"); + if (obj->has("keyspace_id")) + { + keyspace_id = obj->getValue("keyspace_id"); + } name = obj->get("db_name").extract()->get("L").convert(); charset = obj->get("charset").convert(); collate = obj->get("collate").convert(); @@ -807,14 +812,23 @@ catch (const Poco::Exception & e) /////////////////////// ////// TableInfo ////// /////////////////////// -TableInfo::TableInfo(Poco::JSON::Object::Ptr json) +TableInfo::TableInfo(Poco::JSON::Object::Ptr json, KeyspaceID keyspace_id_) { deserialize(json); + if (keyspace_id == NullspaceID) + { + keyspace_id = keyspace_id_; + } } -TableInfo::TableInfo(const String & table_info_json) +TableInfo::TableInfo(const String & table_info_json, KeyspaceID keyspace_id_) { deserialize(table_info_json); + // If the table_info_json has no keyspace id, we use the keyspace_id_ as the default value. + if (keyspace_id == NullspaceID) + { + keyspace_id = keyspace_id_; + } } String TableInfo::serialize() const @@ -824,6 +838,7 @@ try Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); json->set("id", id); + json->set("keyspace_id", keyspace_id); Poco::JSON::Object::Ptr name_json = new Poco::JSON::Object(); name_json->set("O", name); name_json->set("L", name); @@ -894,6 +909,10 @@ void TableInfo::deserialize(Poco::JSON::Object::Ptr obj) try { id = obj->getValue("id"); + if (obj->has("keyspace_id")) + { + keyspace_id = obj->getValue("keyspace_id"); + } name = obj->getObject("name")->getValue("L"); auto cols_arr = obj->getArray("cols"); diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 5062e38079d..5f818bf10dd 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -47,6 +47,8 @@ namespace TiDB { using DB::ColumnID; using DB::DatabaseID; +using DB::KeyspaceID; +using DB::NullspaceID; using DB::String; using DB::TableID; using DB::Timestamp; @@ -274,13 +276,21 @@ struct PartitionInfo struct DBInfo { DatabaseID id = -1; + KeyspaceID keyspace_id = NullspaceID; String name; String charset; String collate; SchemaState state; DBInfo() = default; - explicit DBInfo(const String & json) { deserialize(json); } + explicit DBInfo(const String & json, KeyspaceID keyspace_id_) + { + deserialize(json); + if (keyspace_id == NullspaceID) + { + keyspace_id = keyspace_id_; + } + } String serialize() const; @@ -352,9 +362,9 @@ struct TableInfo TableInfo & operator=(const TableInfo &) = default; - explicit TableInfo(Poco::JSON::Object::Ptr json); + explicit TableInfo(Poco::JSON::Object::Ptr json, KeyspaceID keyspace_id_); - explicit TableInfo(const String & table_info_json); + explicit TableInfo(const String & table_info_json, KeyspaceID keyspace_id_); String serialize() const; @@ -367,6 +377,8 @@ struct TableInfo // and partition ID for partition table, // whereas field `belonging_table_id` below actually means the table ID this partition belongs to. TableID id = DB::InvalidTableID; + // The keyspace where the table belongs to. + KeyspaceID keyspace_id = NullspaceID; String name; // Columns are listed in the order in which they appear in the schema. std::vector columns; diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.cpp b/dbms/src/Storages/Transaction/TiKVKeyValue.cpp new file mode 100644 index 00000000000..4829aca7fa1 --- /dev/null +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.cpp @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +KeyspaceID DecodedTiKVKey::getKeyspaceID() const +{ + if (size() < KEYSPACE_PREFIX_LEN || *begin() != TXN_MODE_PREFIX) + return NullspaceID; + + char buf[KEYSPACE_PREFIX_LEN]; + memcpy(buf, data(), KEYSPACE_PREFIX_LEN); + buf[0] = 0; + return toBigEndian(*reinterpret_cast(buf)); +} + +std::string_view DecodedTiKVKey::getUserKey() const +{ + if (size() < KEYSPACE_PREFIX_LEN || *begin() != TXN_MODE_PREFIX) + return std::string_view(c_str(), size()); + + return std::string_view(c_str() + KEYSPACE_PREFIX_LEN, size() - KEYSPACE_PREFIX_LEN); +} + +std::string DecodedTiKVKey::makeKeyspacePrefix(KeyspaceID keyspace_id) +{ + if (keyspace_id == NullspaceID) + return std::string(); + std::string prefix(KEYSPACE_PREFIX_LEN, 0); + keyspace_id = toBigEndian(keyspace_id); + memcpy(prefix.data(), reinterpret_cast(&keyspace_id), KEYSPACE_PREFIX_LEN); + prefix[0] = TXN_MODE_PREFIX; + return prefix; +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.h b/dbms/src/Storages/Transaction/TiKVKeyValue.h index 45db99b03a4..5b86d2219d0 100644 --- a/dbms/src/Storages/Transaction/TiKVKeyValue.h +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.h @@ -21,6 +21,9 @@ namespace DB { +static const size_t KEYSPACE_PREFIX_LEN = 4; +static const char TXN_MODE_PREFIX = 'x'; + template struct StringObject : std::string { @@ -100,6 +103,10 @@ struct DecodedTiKVKey : std::string (Base &)* this = (Base &&) obj; return *this; } + + KeyspaceID getKeyspaceID() const; + std::string_view getUserKey() const; + static std::string makeKeyspacePrefix(KeyspaceID keyspace_id); }; static_assert(sizeof(DecodedTiKVKey) == sizeof(std::string)); diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.h b/dbms/src/Storages/Transaction/TiKVRecordFormat.h index 2c1739bb609..b4c296816fa 100644 --- a/dbms/src/Storages/Transaction/TiKVRecordFormat.h +++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.h @@ -212,21 +212,29 @@ inline Timestamp getTs(const TiKVKey & key) return decodeUInt64Desc(read(key.data() + key.dataSize() - 8)); } -inline TableID getTableId(const DecodedTiKVKey & key) +template +inline TableID getTableId(const T & key) { return decodeInt64(read(key.data() + 1)); } inline HandleID getHandle(const DecodedTiKVKey & key) { - return decodeInt64(read(key.data() + RAW_KEY_NO_HANDLE_SIZE)); + return decodeInt64(read(key.getUserKey().data())); +} + +inline std::string_view getRawTiDBPKView(const DecodedTiKVKey & key) +{ + auto user_key = key.getUserKey(); + return std::string_view(user_key.data() + RAW_KEY_NO_HANDLE_SIZE, user_key.size() - RAW_KEY_NO_HANDLE_SIZE); } inline RawTiDBPK getRawTiDBPK(const DecodedTiKVKey & key) { - return std::make_shared(key.begin() + RAW_KEY_NO_HANDLE_SIZE, key.end()); + return std::make_shared(getRawTiDBPKView(key)); } + inline TableID getTableId(const TiKVKey & key) { return getTableId(decodeTiKVKey(key)); diff --git a/dbms/src/Storages/Transaction/Types.h b/dbms/src/Storages/Transaction/Types.h index 9bb338b2d33..e35e01fc09e 100644 --- a/dbms/src/Storages/Transaction/Types.h +++ b/dbms/src/Storages/Transaction/Types.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -27,6 +28,11 @@ static constexpr StoreID InvalidStoreID = 0; using TableID = Int64; using TableIDSet = std::unordered_set; +using KeyspaceID = pingcap::pd::KeyspaceID; + +using KeyspaceTableID = std::pair; + +static auto const NullspaceID = pingcap::pd::NullspaceID; enum : TableID { @@ -35,6 +41,8 @@ enum : TableID using DatabaseID = Int64; +using KeyspaceDatabaseID = std::pair; + using ColumnID = Int64; enum : ColumnID diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.cpp b/dbms/src/TiDB/Schema/SchemaBuilder.cpp index 68eb59f6e1e..7752c9898d9 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.cpp +++ b/dbms/src/TiDB/Schema/SchemaBuilder.cpp @@ -403,7 +403,7 @@ void SchemaBuilder::applyAlterTable(const DBInfoPtr & db_inf throw TiFlashException(fmt::format("miss table in TiKV : {}", table_id), Errors::DDL::StaleSchema); } auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(table_info->id); + auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id); if (storage == nullptr) { throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *table_info)), @@ -427,7 +427,7 @@ void SchemaBuilder::applyAlterLogicalTable(const DBInfoPtr & for (const auto & part_def : table_info->partition.definitions) { auto part_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper); - auto part_storage = tmt_context.getStorages().get(part_def.id); + auto part_storage = tmt_context.getStorages().get(keyspace_id, part_def.id); if (part_storage == nullptr) { throw TiFlashException(fmt::format("miss table in TiFlash : {}, partition: {}.", name_mapper.debugCanonicalName(*db_info, *table_info), part_def.id), @@ -590,7 +590,7 @@ void SchemaBuilder::applyPartitionDiff(const TiDB::DBInfoPtr } auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(table_info->id); + auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id); if (storage == nullptr) { throw TiFlashException(fmt::format("miss table in TiFlash {}", table_id), Errors::DDL::MissingTable); @@ -675,7 +675,7 @@ void SchemaBuilder::applyRenameTable(const DBInfoPtr & new_d } auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(table_id); + auto storage = tmt_context.getStorages().get(keyspace_id, table_id); if (storage == nullptr) { throw TiFlashException(fmt::format("miss table id in TiFlash {}", table_id), Errors::DDL::MissingTable); @@ -697,7 +697,7 @@ void SchemaBuilder::applyRenameLogicalTable( auto & tmt_context = context.getTMTContext(); for (const auto & part_def : new_table_info->partition.definitions) { - auto part_storage = tmt_context.getStorages().get(part_def.id); + auto part_storage = tmt_context.getStorages().get(keyspace_id, part_def.id); if (part_storage == nullptr) { throw Exception(fmt::format("miss old table id in Flash {}", part_def.id)); @@ -784,7 +784,7 @@ void SchemaBuilder::applyExchangeTablePartition(const Schema if (table_info == nullptr) throw TiFlashException(fmt::format("miss table in TiKV : {}", pt_table_info), Errors::DDL::StaleSchema); auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(table_info->id); + auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id); if (storage == nullptr) throw TiFlashException( fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*pt_db_info, *table_info)), @@ -806,7 +806,7 @@ void SchemaBuilder::applyExchangeTablePartition(const Schema FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_after_step_1_in_exchange_partition); /// step 2 change non partition table to a partition of the partition table - storage = tmt_context.getStorages().get(npt_table_id); + storage = tmt_context.getStorages().get(keyspace_id, npt_table_id); if (storage == nullptr) throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*npt_db_info, *table_info)), Errors::DDL::MissingTable); @@ -835,7 +835,7 @@ void SchemaBuilder::applyExchangeTablePartition(const Schema table_info = getter.getTableInfo(npt_db_info->id, pt_partition_id); if (table_info == nullptr) throw TiFlashException(fmt::format("miss table in TiKV : {}", pt_partition_id), Errors::DDL::StaleSchema); - storage = tmt_context.getStorages().get(table_info->id); + storage = tmt_context.getStorages().get(keyspace_id, table_info->id); if (storage == nullptr) throw TiFlashException( fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*pt_db_info, *table_info)), @@ -929,14 +929,15 @@ void SchemaBuilder::applyCreateSchema(const TiDB::DBInfoPtr interpreter.setForceRestoreData(false); interpreter.execute(); - databases[db_info->id] = db_info; + databases.emplace(KeyspaceDatabaseID{keyspace_id, db_info->id}, db_info); LOG_INFO(log, "Created database {}", name_mapper.debugDatabaseName(*db_info)); } template void SchemaBuilder::applyDropSchema(DatabaseID schema_id) { - auto it = databases.find(schema_id); + auto ks_db_id = KeyspaceDatabaseID{keyspace_id, schema_id}; + auto it = databases.find(ks_db_id); if (unlikely(it == databases.end())) { LOG_INFO( @@ -946,7 +947,7 @@ void SchemaBuilder::applyDropSchema(DatabaseID schema_id) return; } applyDropSchema(name_mapper.mapDatabaseName(*it->second)); - databases.erase(schema_id); + databases.erase(ks_db_id); } template @@ -1065,7 +1066,7 @@ void SchemaBuilder::applyCreatePhysicalTable(const DBInfoPtr /// Check if this is a RECOVER table. { auto & tmt_context = context.getTMTContext(); - if (auto * storage = tmt_context.getStorages().get(table_info->id).get(); storage) + if (auto * storage = tmt_context.getStorages().get(keyspace_id, table_info->id).get(); storage) { if (!storage->isTombstone()) { @@ -1150,7 +1151,7 @@ template void SchemaBuilder::applyDropPhysicalTable(const String & db_name, TableID table_id) { auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(table_id); + auto storage = tmt_context.getStorages().get(keyspace_id, table_id); if (storage == nullptr) { LOG_DEBUG(log, "table {} does not exist.", table_id); @@ -1179,7 +1180,7 @@ template void SchemaBuilder::applyDropTable(const DBInfoPtr & db_info, TableID table_id) { auto & tmt_context = context.getTMTContext(); - auto * storage = tmt_context.getStorages().get(table_id).get(); + auto * storage = tmt_context.getStorages().get(keyspace_id, table_id).get(); if (storage == nullptr) { LOG_DEBUG(log, "table {} does not exist.", table_id); @@ -1209,7 +1210,7 @@ void SchemaBuilder::applySetTiFlashReplica(const TiDB::DBInf } auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(latest_table_info->id); + auto storage = tmt_context.getStorages().get(keyspace_id, latest_table_info->id); if (unlikely(storage == nullptr)) { throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *latest_table_info)), @@ -1231,7 +1232,7 @@ void SchemaBuilder::applySetTiFlashReplicaOnLogicalTable(con for (const auto & part_def : table_info->partition.definitions) { auto new_part_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper); - auto part_storage = tmt_context.getStorages().get(new_part_table_info->id); + auto part_storage = tmt_context.getStorages().get(keyspace_id, new_part_table_info->id); if (unlikely(part_storage == nullptr)) { throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *new_part_table_info)), @@ -1279,7 +1280,7 @@ void SchemaBuilder::syncAllSchema() for (const auto & db : all_schemas) { db_set.emplace(name_mapper.mapDatabaseName(*db)); - if (databases.find(db->id) == databases.end()) + if (databases.find(KeyspaceDatabaseID{keyspace_id, db->id}) == databases.end()) { applyCreateSchema(db); LOG_DEBUG(log, "Database {} created during sync all schemas", name_mapper.debugDatabaseName(*db)); @@ -1311,12 +1312,12 @@ void SchemaBuilder::syncAllSchema() }); } - auto storage = tmt_context.getStorages().get(table->id); + auto storage = tmt_context.getStorages().get(keyspace_id, table->id); if (storage == nullptr) { /// Create if not exists. applyCreateLogicalTable(db, table); - storage = tmt_context.getStorages().get(table->id); + storage = tmt_context.getStorages().get(keyspace_id, table->id); if (storage == nullptr) { /// This is abnormal as the storage shouldn't be null after creation, the underlying table must already be existing for unknown reason. @@ -1346,10 +1347,15 @@ void SchemaBuilder::syncAllSchema() auto storage_map = tmt_context.getStorages().getAllStorage(); for (auto it = storage_map.begin(); it != storage_map.end(); it++) { - if (table_set.count(it->first) == 0) + auto table_info = it->second->getTableInfo(); + if (table_info.keyspace_id != keyspace_id) { - applyDropPhysicalTable(it->second->getDatabaseName(), it->first); - LOG_DEBUG(log, "Table {}.{} dropped during sync all schemas", it->second->getDatabaseName(), name_mapper.debugTableName(it->second->getTableInfo())); + continue; + } + if (table_set.count(table_info.id) == 0) + { + applyDropPhysicalTable(it->second->getDatabaseName(), table_info.id); + LOG_DEBUG(log, "Table {}.{} dropped during sync all schemas", it->second->getDatabaseName(), name_mapper.debugTableName(table_info)); } } @@ -1357,6 +1363,11 @@ void SchemaBuilder::syncAllSchema() const auto & dbs = context.getDatabases(); for (auto it = dbs.begin(); it != dbs.end(); it++) { + auto db_keyspace_id = SchemaNameMapper::getMappedNameKeyspaceID(it->first); + if (db_keyspace_id != keyspace_id) + { + continue; + } if (db_set.count(it->first) == 0 && !isReservedDatabase(context, it->first)) { applyDropSchema(it->first); diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.h b/dbms/src/TiDB/Schema/SchemaBuilder.h index 4abb4abe0d6..4e8797e9504 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.h +++ b/dbms/src/TiDB/Schema/SchemaBuilder.h @@ -20,6 +20,7 @@ namespace DB { +using KeyspaceDatabaseMap = std::unordered_map>; template struct SchemaBuilder { @@ -29,18 +30,21 @@ struct SchemaBuilder Context & context; - std::unordered_map & databases; + KeyspaceDatabaseMap & databases; Int64 target_version; + const KeyspaceID keyspace_id; + LoggerPtr log; - SchemaBuilder(Getter & getter_, Context & context_, std::unordered_map & dbs_, Int64 version) + SchemaBuilder(Getter & getter_, Context & context_, KeyspaceDatabaseMap & dbs_, Int64 version) : getter(getter_) , context(context_) , databases(dbs_) , target_version(version) - , log(Logger::get()) + , keyspace_id(getter_.getKeyspaceID()) + , log(Logger::get(fmt::format("keyspace={}", keyspace_id))) {} void applyDiff(const SchemaDiff & diff); diff --git a/dbms/src/TiDB/Schema/SchemaGetter.cpp b/dbms/src/TiDB/Schema/SchemaGetter.cpp index 4edd61b3d9e..636da7a4180 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.cpp +++ b/dbms/src/TiDB/Schema/SchemaGetter.cpp @@ -15,7 +15,6 @@ #include #include #include -#include namespace DB { @@ -102,14 +101,14 @@ struct TxnStructure } public: - static String get(pingcap::kv::Snapshot & snap, const String & key) + static String get(KeyspaceSnapshot & snap, const String & key) { String encode_key = encodeStringDataKey(key); String value = snap.Get(encode_key); return value; } - static String hGet(pingcap::kv::Snapshot & snap, const String & key, const String & field) + static String hGet(KeyspaceSnapshot & snap, const String & key, const String & field) { String encode_key = encodeHashDataKey(key, field); String value = snap.Get(encode_key); @@ -117,7 +116,7 @@ struct TxnStructure } // For convinient, we only return values. - static std::vector> hGetAll(pingcap::kv::Snapshot & snap, const String & key) + static std::vector> hGetAll(KeyspaceSnapshot & snap, const String & key) { auto tikv_key_prefix = hashDataKeyPrefix(key); String tikv_key_end = pingcap::kv::prefixNext(tikv_key_prefix); @@ -249,7 +248,7 @@ TiDB::DBInfoPtr SchemaGetter::getDatabase(DatabaseID db_id) return nullptr; LOG_DEBUG(log, "Get DB Info from TiKV : " + json); - auto db_info = std::make_shared(json); + auto db_info = std::make_shared(json, keyspace_id); return db_info; } @@ -265,7 +264,7 @@ TiDB::TableInfoPtr SchemaGetter::getTableInfo(DatabaseID db_id, TableID table_id if (table_info_json.empty()) return nullptr; LOG_DEBUG(log, "Get Table Info from TiKV : " + table_info_json); - TiDB::TableInfoPtr table_info = std::make_shared(table_info_json); + TiDB::TableInfoPtr table_info = std::make_shared(table_info_json, keyspace_id); return table_info; } @@ -275,7 +274,7 @@ std::vector SchemaGetter::listDBs() auto pairs = TxnStructure::hGetAll(snap, DBs); for (const auto & pair : pairs) { - auto db_info = std::make_shared(pair.second); + auto db_info = std::make_shared(pair.second, keyspace_id); res.push_back(db_info); } return res; @@ -307,7 +306,7 @@ std::vector SchemaGetter::listTables(DatabaseID db_id) continue; } const String & json = kv_pair.second; - auto table_info = std::make_shared(json); + auto table_info = std::make_shared(json, keyspace_id); res.push_back(table_info); } diff --git a/dbms/src/TiDB/Schema/SchemaGetter.h b/dbms/src/TiDB/Schema/SchemaGetter.h index 2e67bd90a29..8eb4644a19a 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.h +++ b/dbms/src/TiDB/Schema/SchemaGetter.h @@ -14,16 +14,8 @@ #pragma once +#include #include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" -#ifdef __clang__ -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" -#endif -#include -#pragma GCC diagnostic pop - #include #include @@ -137,14 +129,18 @@ struct SchemaDiff struct SchemaGetter { - pingcap::kv::Snapshot snap; + KeyspaceSnapshot snap; + + KeyspaceID keyspace_id; LoggerPtr log; - SchemaGetter(pingcap::kv::Cluster * cluster_, UInt64 tso_) - : snap(cluster_, tso_) + SchemaGetter(pingcap::kv::Cluster * cluster_, UInt64 tso_, KeyspaceID keyspace_id_) + : snap(keyspace_id_, cluster_, tso_) + , keyspace_id(keyspace_id_) , log(Logger::get()) - {} + { + } Int64 getVersion(); @@ -167,6 +163,8 @@ struct SchemaGetter std::vector listDBs(); std::vector listTables(DatabaseID db_id); + + KeyspaceID getKeyspaceID() const { return keyspace_id; } }; } // namespace DB diff --git a/dbms/src/TiDB/Schema/SchemaNameMapper.h b/dbms/src/TiDB/Schema/SchemaNameMapper.h index 12c014a9020..515a28d9fc4 100644 --- a/dbms/src/TiDB/Schema/SchemaNameMapper.h +++ b/dbms/src/TiDB/Schema/SchemaNameMapper.h @@ -26,18 +26,56 @@ struct SchemaNameMapper static constexpr auto DATABASE_PREFIX = "db_"; static constexpr auto TABLE_PREFIX = "t_"; + static constexpr std::string_view KEYSPACE_PREFIX = "ks_"; - virtual String mapDatabaseName(const TiDB::DBInfo & db_info) const { return DATABASE_PREFIX + std::to_string(db_info.id); } - virtual String displayDatabaseName(const TiDB::DBInfo & db_info) const { return db_info.name; } - virtual String mapTableName(const TiDB::TableInfo & table_info) const { return TABLE_PREFIX + std::to_string(table_info.id); } - virtual String displayTableName(const TiDB::TableInfo & table_info) const { return table_info.name; } + + static KeyspaceID getMappedNameKeyspaceID(const String & name) + { + auto keyspace_prefix_len = KEYSPACE_PREFIX.length(); + auto pos = name.find(KEYSPACE_PREFIX); + if (pos == String::npos) + return NullspaceID; + assert(pos == 0); + pos = name.find('_', keyspace_prefix_len); + assert(pos != String::npos); + return std::stoull(name.substr(keyspace_prefix_len, pos - keyspace_prefix_len)); + } + + static String map2Keyspace(KeyspaceID keyspace_id, const String & name) + { + return keyspace_id == NullspaceID ? name : KEYSPACE_PREFIX.data() + std::to_string(keyspace_id) + "_" + name; + } + + virtual String mapDatabaseName(const TiDB::DBInfo & db_info) const + { + auto db_name = DATABASE_PREFIX + std::to_string(db_info.id); + return map2Keyspace(db_info.keyspace_id, db_name); + } + virtual String displayDatabaseName(const TiDB::DBInfo & db_info) const + { + return map2Keyspace(db_info.keyspace_id, db_info.name); + } + virtual String mapTableName(const TiDB::TableInfo & table_info) const + { + auto table_name = TABLE_PREFIX + std::to_string(table_info.id); + return map2Keyspace(table_info.keyspace_id, table_name); + } + virtual String displayTableName(const TiDB::TableInfo & table_info) const + { + return map2Keyspace(table_info.keyspace_id, table_info.name); + } virtual String mapPartitionName(const TiDB::TableInfo & table_info) const { return mapTableName(table_info); } // Only use for logging / debugging - virtual String debugDatabaseName(const TiDB::DBInfo & db_info) const { return db_info.name + "(" + std::to_string(db_info.id) + ")"; } + virtual String debugDatabaseName(const TiDB::DBInfo & db_info) const + { + auto db_name = db_info.name + "(" + std::to_string(db_info.id) + ")"; + return map2Keyspace(db_info.keyspace_id, db_name); + } virtual String debugTableName(const TiDB::TableInfo & table_info) const { - return table_info.name + "(" + std::to_string(table_info.id) + ")"; + auto table_name = table_info.name + "(" + std::to_string(table_info.id) + ")"; + return map2Keyspace(table_info.keyspace_id, table_name); } virtual String debugCanonicalName(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info) const { diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index 9c63f86c472..a85075812da 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -37,52 +37,104 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_) , background_pool(context_.getBackgroundPool()) , log(Logger::get()) { + // Add task for adding and removing keyspace sync schema tasks. handle = background_pool.addTask( [&, this] { - String stage; - bool done_anything = false; - try - { - /// Do sync schema first, then gc. - /// They must be performed synchronously, - /// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively. - // GC safe point must be obtained ahead of syncing schema. - auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); - stage = "Sync schemas"; - done_anything = syncSchemas(); - if (done_anything) - GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment(); - - stage = "GC"; - done_anything = gc(gc_safe_point); - - return done_anything; - } - catch (const Exception & e) - { - LOG_ERROR(log, "{} failed by {} \n stack : {}", stage, e.displayText(), e.getStackTrace().toString()); - } - catch (const Poco::Exception & e) - { - LOG_ERROR(log, "{} failed by {}", stage, e.displayText()); - } - catch (const std::exception & e) - { - LOG_ERROR(log, "{} failed by {}", stage, e.what()); - } + addKeyspaceGCTasks(); + removeKeyspaceGCTasks(); + return false; }, false); } +void SchemaSyncService::addKeyspaceGCTasks() +{ + auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces(); + std::unique_lock lock(ks_map_mutex); + + // Add new sync schema task for new keyspace. + for (auto const iter : keyspaces) + { + auto ks = iter.first; + if (!ks_handle_map.count(ks)) + { + auto ks_log = log->getChild(fmt::format("keyspace={}", ks)); + LOG_INFO(ks_log, "add sync schema task"); + auto task_handle = background_pool.addTask( + [&, this, ks, ks_log] { + String stage; + bool done_anything = false; + try + { + LOG_DEBUG(ks_log, "auto sync schema", ks); + /// Do sync schema first, then gc. + /// They must be performed synchronously, + /// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively. + // GC safe point must be obtained ahead of syncing schema. + auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); + stage = "Sync schemas"; + done_anything = syncSchemas(ks); + if (done_anything) + GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment(); + + stage = "GC"; + done_anything = gc(gc_safe_point, ks); + + return done_anything; + } + catch (const Exception & e) + { + LOG_ERROR(ks_log, "{} failed by {} \n stack : {}", stage, e.displayText(), e.getStackTrace().toString()); + } + catch (const Poco::Exception & e) + { + LOG_ERROR(ks_log, "{} failed by {}", stage, e.displayText()); + } + catch (const std::exception & e) + { + LOG_ERROR(ks_log, "{} failed by {}", stage, e.what()); + } + return false; + }, + false); + + ks_handle_map.emplace(ks, task_handle); + } + } +} + +void SchemaSyncService::removeKeyspaceGCTasks() +{ + auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces(); + std::unique_lock lock(ks_map_mutex); + + // Remove stale sync schema task. + for (auto const & [ks, task_handle] : ks_handle_map) + { + if (!keyspaces.count(ks)) + { + auto ks_log = log->getChild(fmt::format("keyspace={}", ks)); + LOG_INFO(ks_log, "remove sync schema task"); + ks_handle_map.erase(ks); + background_pool.removeTask(task_handle); + } + } +} + SchemaSyncService::~SchemaSyncService() { background_pool.removeTask(handle); + for (auto const & iter : ks_handle_map) + { + auto task_handle = iter.second; + background_pool.removeTask(task_handle); + } } -bool SchemaSyncService::syncSchemas() +bool SchemaSyncService::syncSchemas(KeyspaceID keyspace_id) { - return context.getTMTContext().getSchemaSyncer()->syncSchemas(context); + return context.getTMTContext().getSchemaSyncer()->syncSchemas(context, keyspace_id); } template @@ -91,13 +143,15 @@ inline bool isSafeForGC(const DatabaseOrTablePtr & ptr, Timestamp gc_safe_point) return ptr->isTombstone() && ptr->getTombstone() < gc_safe_point; } -bool SchemaSyncService::gc(Timestamp gc_safe_point) +bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) { auto & tmt_context = context.getTMTContext(); if (gc_safe_point == gc_context.last_gc_safe_point) return false; - LOG_INFO(log, "Performing GC using safe point {}", gc_safe_point); + auto ks_log = log->getChild(fmt::format("keyspace={}", keyspace_id)); + + LOG_INFO(ks_log, "Performing GC using safe point {}", gc_safe_point); // The storages that are ready for gc std::vector> storages_to_gc; @@ -105,6 +159,9 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) auto dbs = context.getDatabases(); for (const auto & iter : dbs) { + auto db_ks_id = SchemaNameMapper::getMappedNameKeyspaceID(iter.first); + if (db_ks_id != keyspace_id) + continue; const auto & db = iter.second; for (auto table_iter = db->getIterator(context); table_iter->isValid(); table_iter->next()) { @@ -140,7 +197,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) return db_info ? SchemaNameMapper().debugCanonicalName(*db_info, table_info) : "(" + database_name + ")." + SchemaNameMapper().debugTableName(table_info); }(); - LOG_INFO(log, "Physically dropping table {}", canonical_name); + LOG_INFO(ks_log, "Physically dropping table {}", canonical_name); auto drop_query = std::make_shared(); drop_query->database = std::move(database_name); drop_query->table = std::move(table_name); @@ -151,7 +208,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) { InterpreterDropQuery drop_interpreter(ast_drop_query, context); drop_interpreter.execute(); - LOG_INFO(log, "Physically dropped table {}", canonical_name); + LOG_INFO(ks_log, "Physically dropped table {}", canonical_name); } catch (DB::Exception & e) { @@ -162,7 +219,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) err_msg = "locking attempt has timed out!"; // ignore verbose stack for this error else err_msg = getCurrentExceptionMessage(true); - LOG_INFO(log, "Physically drop table {} is skipped, reason: {}", canonical_name, err_msg); + LOG_INFO(ks_log, "Physically drop table {} is skipped, reason: {}", canonical_name, err_msg); } } storages_to_gc.clear(); @@ -171,7 +228,8 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) for (const auto & iter : dbs) { const auto & db = iter.second; - if (!isSafeForGC(db, gc_safe_point)) + auto ks_db_id = SchemaNameMapper::getMappedNameKeyspaceID(iter.first); + if (!isSafeForGC(db, gc_safe_point) || ks_db_id != keyspace_id) continue; const auto & db_name = iter.first; @@ -182,11 +240,11 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) { // There should be something wrong, maybe a read lock of a table is held for a long time. // Just ignore and try to collect this database next time. - LOG_INFO(log, "Physically drop database {} is skipped, reason: {} tables left", db_name, num_tables); + LOG_INFO(ks_log, "Physically drop database {} is skipped, reason: {} tables left", db_name, num_tables); continue; } - LOG_INFO(log, "Physically dropping database {}", db_name); + LOG_INFO(ks_log, "Physically dropping database {}", db_name); auto drop_query = std::make_shared(); drop_query->database = db_name; drop_query->if_exists = true; @@ -196,7 +254,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) { InterpreterDropQuery drop_interpreter(ast_drop_query, context); drop_interpreter.execute(); - LOG_INFO(log, "Physically dropped database {}", db_name); + LOG_INFO(ks_log, "Physically dropped database {}", db_name); } catch (DB::Exception & e) { @@ -206,19 +264,19 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) err_msg = "locking attempt has timed out!"; // ignore verbose stack for this error else err_msg = getCurrentExceptionMessage(true); - LOG_INFO(log, "Physically drop database {} is skipped, reason: {}", db_name, err_msg); + LOG_INFO(ks_log, "Physically drop database {} is skipped, reason: {}", db_name, err_msg); } } if (succeeded) { gc_context.last_gc_safe_point = gc_safe_point; - LOG_INFO(log, "Performed GC using safe point {}", gc_safe_point); + LOG_INFO(ks_log, "Performed GC using safe point {}", gc_safe_point); } else { // Don't update last_gc_safe_point and retry later - LOG_INFO(log, "Performed GC using safe point {} meet error, will try again later", gc_safe_point); + LOG_INFO(ks_log, "Performed GC using safe point {} meet error, will try again later", gc_safe_point); } return true; diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.h b/dbms/src/TiDB/Schema/SchemaSyncService.h index 6a015b1bef3..6ff5d3c7904 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.h +++ b/dbms/src/TiDB/Schema/SchemaSyncService.h @@ -19,6 +19,8 @@ #include #include +#include +#include namespace DB { @@ -42,14 +44,17 @@ class SchemaSyncService ~SchemaSyncService(); private: - bool syncSchemas(); + bool syncSchemas(KeyspaceID keyspace_id); struct GCContext { Timestamp last_gc_safe_point = 0; } gc_context; - bool gc(Timestamp gc_safe_point); + bool gc(Timestamp gc_safe_point, KeyspaceID keyspace_id); + + void addKeyspaceGCTasks(); + void removeKeyspaceGCTasks(); private: Context & context; @@ -59,6 +64,10 @@ class SchemaSyncService BackgroundProcessingPool & background_pool; BackgroundProcessingPool::TaskHandle handle; + mutable std::shared_mutex ks_map_mutex; + // Handles for each keyspace schema sync task. + std::unordered_map ks_handle_map; + LoggerPtr log; }; diff --git a/dbms/src/TiDB/Schema/SchemaSyncer.h b/dbms/src/TiDB/Schema/SchemaSyncer.h index d0c97ce1a71..4c4d41d978a 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncer.h +++ b/dbms/src/TiDB/Schema/SchemaSyncer.h @@ -40,13 +40,13 @@ class SchemaSyncer /** * Get current version of CH schema. */ - virtual Int64 getCurrentVersion() = 0; + virtual Int64 getCurrentVersion(KeyspaceID keyspace_id) = 0; /** * Synchronize all schemas between TiDB and CH. * @param context */ - virtual bool syncSchemas(Context & context) = 0; + virtual bool syncSchemas(Context & context, KeyspaceID keyspace_id) = 0; virtual void reset() = 0; @@ -54,7 +54,7 @@ class SchemaSyncer virtual TiDB::DBInfoPtr getDBInfoByMappedName(const String & mapped_database_name) = 0; - virtual std::vector fetchAllDBs() = 0; + virtual std::vector fetchAllDBs(KeyspaceID keyspace_id) = 0; }; using SchemaSyncerPtr = std::shared_ptr; diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h index 5a5c31c600c..f33beaa9361 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h @@ -34,6 +34,8 @@ namespace ErrorCodes extern const int FAIL_POINT_ERROR; }; +using SchemaVerMap = std::unordered_map; + template struct TiDBSchemaSyncer : public SchemaSyncer { @@ -45,23 +47,22 @@ struct TiDBSchemaSyncer : public SchemaSyncer static constexpr Int64 maxNumberOfDiffs = 100; - Int64 cur_version; + SchemaVerMap cur_versions; std::mutex schema_mutex; - std::unordered_map databases; + KeyspaceDatabaseMap databases; LoggerPtr log; explicit TiDBSchemaSyncer(KVClusterPtr cluster_) : cluster(std::move(cluster_)) - , cur_version(0) , log(Logger::get()) {} bool isTooOldSchema(Int64 cur_ver, Int64 new_version) { return cur_ver == 0 || new_version - cur_ver > maxNumberOfDiffs; } - Getter createSchemaGetter() + Getter createSchemaGetter(KeyspaceID keyspace_id) { [[maybe_unused]] auto tso = cluster->pd_client->getTS(); if constexpr (mock_getter) @@ -70,7 +71,7 @@ struct TiDBSchemaSyncer : public SchemaSyncer } else { - return Getter(cluster.get(), tso); + return Getter(cluster.get(), tso, keyspace_id); } } @@ -83,26 +84,31 @@ struct TiDBSchemaSyncer : public SchemaSyncer std::lock_guard lock(schema_mutex); databases.clear(); - cur_version = 0; + cur_versions.clear(); } - std::vector fetchAllDBs() override + std::vector fetchAllDBs(KeyspaceID keyspace_id) override { - auto getter = createSchemaGetter(); + auto getter = createSchemaGetter(keyspace_id); return getter.listDBs(); } - Int64 getCurrentVersion() override + Int64 getCurrentVersion(KeyspaceID keyspace_id) override { std::lock_guard lock(schema_mutex); - return cur_version; + auto it = cur_versions.find(keyspace_id); + if (it == cur_versions.end()) + return 0; + return it->second; } - bool syncSchemas(Context & context) override + bool syncSchemas(Context & context, KeyspaceID keyspace_id) override { std::lock_guard lock(schema_mutex); + auto ks_log = log->getChild(fmt::format("keyspace={}", keyspace_id)); + auto cur_version = cur_versions.try_emplace(keyspace_id, 0).first->second; + auto getter = createSchemaGetter(keyspace_id); - auto getter = createSchemaGetter(); Int64 version = getter.getVersion(); if (version <= cur_version) { @@ -111,7 +117,7 @@ struct TiDBSchemaSyncer : public SchemaSyncer Stopwatch watch; SCOPE_EXIT({ GET_METRIC(tiflash_schema_apply_duration_seconds).Observe(watch.elapsedSeconds()); }); - LOG_INFO(log, "Start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version); + LOG_INFO(ks_log, "Start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version); // Show whether the schema mutex is held for a long time or not. GET_METRIC(tiflash_schema_applying).Set(1.0); @@ -126,14 +132,15 @@ struct TiDBSchemaSyncer : public SchemaSyncer // Since TiDB can not make sure the schema diff of the latest schema version X is not empty, under this situation we should set the `cur_version` // to X-1 and try to fetch the schema diff X next time. Int64 version_after_load_diff = 0; - if (version_after_load_diff = tryLoadSchemaDiffs(getter, version, context); version_after_load_diff == -1) + if (version_after_load_diff = tryLoadSchemaDiffs(getter, cur_version, version, context, ks_log); version_after_load_diff == -1) { GET_METRIC(tiflash_schema_apply_count, type_full).Increment(); version_after_load_diff = loadAllSchema(getter, version, context); } - cur_version = version_after_load_diff; + cur_versions[keyspace_id] = version_after_load_diff; + // TODO: (keyspace) attach keyspace id to the metrics. GET_METRIC(tiflash_schema_version).Set(cur_version); - LOG_INFO(log, "End sync schema, version has been updated to {}{}", cur_version, cur_version == version ? "" : "(latest diff is empty)"); + LOG_INFO(ks_log, "End sync schema, version has been updated to {}{}", keyspace_id, cur_version, cur_version == version ? "" : "(latest diff is empty)"); return true; } @@ -161,15 +168,15 @@ struct TiDBSchemaSyncer : public SchemaSyncer // - if latest schema diff is not empty, return the (latest_version) // - if latest schema diff is empty, return the (latest_version - 1) // - if schema_diff.regenerate_schema_map == true, need reload all schema info from TiKV, return (-1) - // - if error happend, return (-1) - Int64 tryLoadSchemaDiffs(Getter & getter, Int64 latest_version, Context & context) + // - if error happens, return (-1) + Int64 tryLoadSchemaDiffs(Getter & getter, Int64 cur_version, Int64 latest_version, Context & context, const LoggerPtr & ks_log) { if (isTooOldSchema(cur_version, latest_version)) { return -1; } - LOG_DEBUG(log, "Try load schema diffs."); + LOG_DEBUG(ks_log, "Try load schema diffs."); Int64 used_version = cur_version; // First get all schema diff from `cur_version` to `latest_version`. Only apply the schema diff(s) if we fetch all @@ -180,12 +187,12 @@ struct TiDBSchemaSyncer : public SchemaSyncer used_version++; diffs.push_back(getter.getSchemaDiff(used_version)); } - LOG_DEBUG(log, "End load schema diffs with total {} entries.", diffs.size()); + LOG_DEBUG(ks_log, "End load schema diffs with total {} entries.", diffs.size()); if (diffs.empty()) { - LOG_WARNING(log, "Schema Diff is empty."); + LOG_WARNING(ks_log, "Schema Diff is empty."); return -1; } // Since the latest schema diff may be empty, and schemaBuilder may need to update the latest version for storageDeltaMerge, @@ -242,19 +249,19 @@ struct TiDBSchemaSyncer : public SchemaSyncer throw; } GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); - LOG_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); + LOG_WARNING(ks_log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); return -1; } catch (Poco::Exception & e) { GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); - LOG_WARNING(log, "apply diff meets exception : {}", e.displayText()); + LOG_WARNING(ks_log, "apply diff meets exception : {}", e.displayText()); return -1; } catch (std::exception & e) { GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); - LOG_WARNING(log, "apply diff meets exception : {}", e.what()); + LOG_WARNING(ks_log, "apply diff meets exception : {}", e.what()); return -1; } diff --git a/etc/config-template.toml b/etc/config-template.toml index d46bb632e0c..82cdfab1ba0 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -30,6 +30,10 @@ ## The storage format version in storage engine. Valid values: 1, 2. ## format_version = 2 +## The storage api-version for the TiFlash engine. Valid values: 1, 2, default is 1. +## while using api-version = 2, multiple keyspace data could be stored in a single storage. +## api_version = 1 + ## If there are multiple SSD disks on the machine, ## specify the path list on `storage.main.dir` can improve TiFlash performance.