Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support keyspace feature #7039

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b948eec
tiflash support keyspace feature
iosmanthus Feb 14, 2023
5907a5b
Merge branch 'master' of github.com:pingcap/tiflash into keyspace
iosmanthus Feb 14, 2023
a82b8ad
remove IDAsPathUpgrader and format files
iosmanthus Feb 14, 2023
cb87b21
add license header
iosmanthus Feb 14, 2023
381e3e6
Merge branch 'master' into keyspace
JaySon-Huang Feb 15, 2023
bdf4629
better exception check
JaySon-Huang Feb 16, 2023
54dff9e
Merge remote-tracking branch 'upstream/master' into keyspace
JaySon-Huang Feb 16, 2023
43b67c2
Format files
JaySon-Huang Feb 16, 2023
25a5843
Fix compile error
JaySon-Huang Feb 16, 2023
264e1db
attach keyspace_id for tiflash cop/mpp requests
iosmanthus Feb 20, 2023
62a18bb
Sync keyspace schema in exclusive task
yongman Feb 21, 2023
7b9c63d
Fix lock type in schema sync
yongman Feb 21, 2023
47400e1
Split lambda to functions
yongman Feb 28, 2023
58c9ee8
Merge pull request #1 from yongman/optimize-schema-sync
iosmanthus Mar 3, 2023
bd877c4
Merge branch 'master' of github.com:pingcap/tiflash into keyspace
iosmanthus Mar 7, 2023
c66f1a1
Merge branch 'master' of github.com:pingcap/tiflash into keyspace
iosmanthus Mar 7, 2023
990a7ec
format
iosmanthus Mar 7, 2023
32c2e2f
manual compact request support keyspace
iosmanthus Mar 7, 2023
afc4912
address comments from @JaySon-Huang
iosmanthus Mar 8, 2023
fc73d3c
add default value for table_id and keyspace_id in RegionRangeKeys
iosmanthus Mar 8, 2023
bd475c7
Small log fixes
JaySon-Huang Mar 9, 2023
6e2efa5
Small log fixes
JaySon-Huang Mar 9, 2023
e5c527d
preserve old schema sync api
iosmanthus Mar 9, 2023
a9f53be
address comments from @JaySon-Huang
iosmanthus Mar 9, 2023
d9a51db
use std::string_view instead of char[4] for KEYSPACE_PREFIX
iosmanthus Mar 9, 2023
96f4445
refine comments
iosmanthus Mar 9, 2023
debd47d
Merge branch 'master' of github.com:pingcap/tiflash into keyspace
iosmanthus Mar 9, 2023
5ab215d
Merge branch 'master' into keyspace
iosmanthus Mar 9, 2023
57ccc1e
bump client-c to latest commit of api-v2-for-release-6.6
iosmanthus Mar 9, 2023
7f23c28
Merge branch 'master' of github.com:pingcap/tiflash into keyspace
iosmanthus Mar 9, 2023
08925c4
update client-c and kvproto's commit and add api_version docs in exam…
iosmanthus Mar 10, 2023
33933eb
Merge branch 'master' of github.com:pingcap/tiflash into keyspace
iosmanthus Mar 10, 2023
bdb71ca
update client-c to master
iosmanthus Mar 10, 2023
9c193ac
git checkout master contrib/tiflash-proxy
iosmanthus Mar 10, 2023
271ef98
Merge branch 'master' into support-keyspace
iosmanthus Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Common/RedactHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void Redact::setRedactLog(bool v)
Redact::REDACT_LOG.store(v, std::memory_order_relaxed);
}

std::string Redact::handleToDebugString(DB::HandleID handle)
std::string Redact::handleToDebugString(int64_t handle)
{
if (Redact::REDACT_LOG.load(std::memory_order_relaxed))
return "?";
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Common/RedactHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include <Storages/Transaction/Types.h>

#include <atomic>
#include <ostream>

Expand All @@ -29,7 +27,7 @@ class Redact
public:
static void setRedactLog(bool v);

static std::string handleToDebugString(DB::HandleID handle);
static std::string handleToDebugString(int64_t handle);
static std::string keyToDebugString(const char * key, size_t size);

static std::string keyToHexString(const char * key, size_t size);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/tests/gtest_redact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ namespace DB
{
namespace tests
{
TEST(RedactLog_test, Basic)
TEST(RedactLogTest, Basic)
{
const char * test_key = "\x01\x0a\xff";
const size_t key_sz = strlen(test_key);

const DB::HandleID test_handle = 10009;
const /*DB::HandleID*/ Int64 test_handle = 10009;

Redact::setRedactLog(false);
EXPECT_EQ(Redact::keyToDebugString(test_key, key_sz), "010AFF");
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ TableID MockRaftStoreProxy::bootstrap_table(
UInt64 table_id = MockTiDB::instance().newTable("d", "t" + toString(random()), columns, tso, "", "dt");

auto schema_syncer = tmt.getSchemaSyncer();
schema_syncer->syncSchemas(ctx);
schema_syncer->syncSchemas(ctx, NullspaceID);
this->table_id = table_id;
return table_id;
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct MockSchemaGetter
}
return res;
}

KeyspaceID getKeyspaceID() const { return NullspaceID; }
};

} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_
tables_by_id.erase(partition.id);
if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(partition.id))
for (auto & e : region_table.getRegionsByTable(NullspaceID, partition.id))
kvstore->mockRemoveRegion(e.first, region_table);
region_table.removeTable(partition.id);
region_table.removeTable(NullspaceID, partition.id);
}
}
}
Expand All @@ -94,9 +94,9 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table->id()))
for (auto & e : region_table.getRegionsByTable(NullspaceID, table->id()))
kvstore->mockRemoveRegion(e.first, region_table);
region_table.removeTable(table->id());
region_table.removeTable(NullspaceID, table->id());
}

return table;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ static GlobalRegionMap GLOBAL_REGION_MAP;
/// Pre-decode region data into block cache and remove committed data from `region`
RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & region, Context & context)
{
auto keyspace_id = region->getKeyspaceID();
const auto & tmt = context.getTMTContext();
{
Timestamp gc_safe_point = 0;
Expand Down Expand Up @@ -556,7 +557,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio

const auto atomic_decode = [&](bool force_decode) -> bool {
Stopwatch watch;
auto storage = tmt.getStorages().get(table_id);
auto storage = tmt.getStorages().get(keyspace_id, table_id);
if (storage == nullptr || storage->isTombstone())
{
if (!force_decode) // Need to update.
Expand Down Expand Up @@ -594,7 +595,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio

if (!atomic_decode(false))
{
tmt.getSchemaSyncer()->syncSchemas(context);
tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id);

if (!atomic_decode(true))
throw Exception("Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed",
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer
auto schema_syncer = tmt.getSchemaSyncer();
try
{
schema_syncer->syncSchemas(context);
schema_syncer->syncSchemas(context, NullspaceID);
}
catch (Exception & e)
{
Expand Down Expand Up @@ -95,7 +95,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
else
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
service->gc(gc_safe_point);
service->gc(gc_safe_point, NullspaceID);

output("schemas gc done");
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncSchemaName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs
for (const auto & part_def : table_info.partition.definitions)
{
auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper);
auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id);
auto partition_storage = context.getTMTContext().getStorages().get(NullspaceID, paritition_table_info->id);
fmt_buf.append((std::to_string(partition_storage->getTableInfo().replica_info.count)));
fmt_buf.append("/");
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/dbgNaturalDag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void NaturalDag::loadTables(const NaturalDag::JSONObjectPtr & obj)
table.id = id;
auto tbl_json = td_json->getObject(std::to_string(id));
auto meta_json = tbl_json->getObject(TABLE_META);
table.meta = TiDB::TableInfo(meta_json);
table.meta = TiDB::TableInfo(meta_json, NullspaceID);
auto regions_json = tbl_json->getArray(TABLE_REGIONS);
for (const auto & region_json : *regions_json)
{
Expand Down Expand Up @@ -208,7 +208,7 @@ void NaturalDag::buildTables(Context & context)
auto & table = it.second;
auto meta = table.meta;
MockTiDB::instance().addTable(db_name, std::move(meta));
schema_syncer->syncSchemas(context);
schema_syncer->syncSchemas(context, NullspaceID);
for (auto & region : table.regions)
{
metapb::Region region_pb;
Expand Down Expand Up @@ -243,7 +243,7 @@ void NaturalDag::buildDatabase(Context & context, SchemaSyncerPtr & schema_synce
MockTiDB::instance().dropDB(context, db_name, true);
}
MockTiDB::instance().newDataBase(db_name);
schema_syncer->syncSchemas(context);
schema_syncer->syncSchemas(context, NullspaceID);
}

void NaturalDag::build(Context & context)
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro
for (const auto & partition : table_info->partition.definitions)
{
const auto partition_id = partition.id;
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(partition_id);
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, partition_id);
for (size_t i = 0; i < regions.size(); ++i)
{
if ((current_region_size + i) % properties.mpp_partition_num != static_cast<size_t>(task.partition_id))
Expand All @@ -202,7 +202,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro
}
else
{
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, table_id);
if (regions.size() < static_cast<size_t>(properties.mpp_partition_num))
throw Exception("Not supported: table region num less than mpp partition num");
for (size_t i = 0; i < regions.size(); ++i)
Expand All @@ -229,7 +229,7 @@ BlockInputStreamPtr executeNonMPPQuery(Context & context, RegionID region_id, co
RegionPtr region;
if (region_id == InvalidRegionID)
{
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, table_id);
if (regions.empty())
throw Exception("No region for table", ErrorCodes::BAD_ARGUMENTS);
region = regions[0].second;
Expand Down Expand Up @@ -307,7 +307,7 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest

table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log);
DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", false, log);
context.setDAGContext(&dag_context);

DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down Expand Up @@ -335,7 +335,7 @@ bool runAndCompareDagReq(const coprocessor::Request & req, const coprocessor::Re
auto & table_regions_info = tables_regions_info.getSingleTableRegions();
table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log);
DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", false, log);
context.setDAGContext(&dag_context);
DAGDriver driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
driver.execute();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ Int64 concurrentRangeOperate(

{
TMTContext & tmt = context.getTMTContext();
for (auto && [_, r] : tmt.getRegionTable().getRegionsByTable(table_info.id))
for (auto && [_, r] : tmt.getRegionTable().getRegionsByTable(NullspaceID, table_info.id))
{
std::ignore = _;
if (r == nullptr)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ grpc::Status BatchCoprocessorHandler::execute()
DAGContext dag_context(
dag_request,
std::move(tables_regions_info),
cop_request->context().keyspace_id(),
cop_context.db_context.getClientInfo().current_address.toString(),
/*is_batch_cop=*/true,
Logger::get("BatchCoprocessorHandler"));
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ bool strictSqlMode(UInt64 sql_mode)
}

// for non-mpp(cop/batchCop)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -52,6 +52,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(keyspace_id_)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
Expand Down Expand Up @@ -79,6 +80,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(meta_.keyspace_id())
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);
Expand Down Expand Up @@ -272,6 +272,7 @@ class DAGContext

void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getRootExecutorId();

const tipb::DAGRequest * dag_request;
Expand Down Expand Up @@ -362,6 +363,9 @@ class DAGContext
// In disaggregated tiflash mode, table_scan in tiflash_compute node will be converted ExchangeReceiver.
// Record here so we can add to receiver_set and cancel/close it.
std::optional<std::pair<String, ExchangeReceiverPtr>> disaggregated_compute_exchange_receiver;

// The keyspace that the DAG request from
const KeyspaceID keyspace_id = NullspaceID;
};

} // namespace DB
13 changes: 7 additions & 6 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ std::vector<pingcap::coprocessor::CopTask> DAGStorageInterpreter::buildCopTasks(
std::multimap<std::string, std::string> meta_data;
meta_data.emplace("is_remote_read", "true");

auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] {
auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, dagContext().getKeyspaceID(), &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] {
GET_METRIC(tiflash_coprocessor_request_count, type_remote_read_sent).Increment();
});
all_tasks.insert(all_tasks.end(), tasks.begin(), tasks.end());
Expand Down Expand Up @@ -842,10 +842,11 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max

std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAGStorageInterpreter::getAndLockStorages(Int64 query_schema_version)
{
auto keyspace_id = context.getDAGContext()->getKeyspaceID();
std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> storages_with_lock;
if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION))
{
auto logical_table_storage = tmt.getStorages().get(logical_table_id);
auto logical_table_storage = tmt.getStorages().get(keyspace_id, logical_table_id);
if (!logical_table_storage)
{
throw TiFlashException(fmt::format("Table {} doesn't exist.", logical_table_id), Errors::Table::NotExists);
Expand All @@ -855,7 +856,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
{
for (auto const physical_table_id : table_scan.getPhysicalTableIDs())
{
auto physical_table_storage = tmt.getStorages().get(physical_table_id);
auto physical_table_storage = tmt.getStorages().get(keyspace_id, physical_table_id);
if (!physical_table_storage)
{
throw TiFlashException(fmt::format("Table {} doesn't exist.", physical_table_id), Errors::Table::NotExists);
Expand All @@ -866,14 +867,14 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion();
auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion(keyspace_id);

/// Align schema version under the read lock.
/// Return: [storage, table_structure_lock, storage_schema_version, ok]
auto get_and_lock_storage = [&](bool schema_synced, TableID table_id) -> std::tuple<ManageableStoragePtr, TableStructureLockHolder, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto table_store = tmt.getStorages().get(table_id);
auto table_store = tmt.getStorages().get(keyspace_id, table_id);
if (!table_store)
{
if (schema_synced)
Expand Down Expand Up @@ -968,7 +969,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto sync_schema = [&] {
auto start_time = Clock::now();
GET_METRIC(tiflash_schema_trigger_count, type_cop_read).Increment();
tmt.getSchemaSyncer()->syncSchemas(context);
tmt.getSchemaSyncer()->syncSchemas(context, dagContext().getKeyspaceID());
auto schema_sync_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
LOG_INFO(log, "Table {} schema sync cost {}ms.", logical_table_id, schema_sync_cost);
};
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ grpc::Status CoprocessorHandler::execute()
DAGContext dag_context(
dag_request,
std::move(tables_regions_info),
cop_request->context().keyspace_id(),
cop_context.db_context.getClientInfo().current_address.toString(),
/*is_batch_cop=*/false,
Logger::get("CoprocessorHandler"));
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Flash/Management/ManualCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ ManualCompactManager::ManualCompactManager(const Context & global_context_, cons

grpc::Status ManualCompactManager::handleRequest(const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response)
{
auto ks_tbl_id = KeyspaceTableID{request->keyspace_id(), request->logical_table_id()};
{
std::lock_guard lock(mutex);

// Check whether there are duplicated executions.
if (unsync_active_logical_table_ids.count(request->logical_table_id()))
if (unsync_active_logical_table_ids.count(ks_tbl_id))
{
response->mutable_error()->mutable_err_compact_in_progress();
response->set_has_remaining(false);
Expand All @@ -57,12 +58,12 @@ grpc::Status ManualCompactManager::handleRequest(const ::kvrpcpb::CompactRequest
return grpc::Status::OK;
}

unsync_active_logical_table_ids.insert(request->logical_table_id());
unsync_active_logical_table_ids.insert(ks_tbl_id);
unsync_running_or_pending_tasks++;
}
SCOPE_EXIT({
std::lock_guard lock(mutex);
unsync_active_logical_table_ids.erase(request->logical_table_id());
unsync_active_logical_table_ids.erase(ks_tbl_id);
unsync_running_or_pending_tasks--;
});

Expand Down Expand Up @@ -100,7 +101,7 @@ grpc::Status ManualCompactManager::doWorkWithCatch(const ::kvrpcpb::CompactReque
grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response)
{
const auto & tmt_context = global_context.getTMTContext();
auto storage = tmt_context.getStorages().get(request->physical_table_id());
auto storage = tmt_context.getStorages().get(request->keyspace_id(), request->physical_table_id());
if (storage == nullptr)
{
response->mutable_error()->mutable_err_physical_table_not_exist();
Expand Down Expand Up @@ -162,6 +163,7 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ

Stopwatch timer;

// TODO(iosmanthus): attach keyspace id for this logger.
LOG_INFO(log, "Manual compaction begin for table {}, start_key = {}", request->physical_table_id(), start_key.toDebugString());

// Repeatedly merge multiple segments as much as possible.
Expand Down
Loading