Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support keyspace requests #123

Merged
merged 13 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion include/pingcap/Config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#pragma once

#pragma GCC diagnostic push
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <grpcpp/security/credentials.h>
#pragma GCC diagnostic pop

#include <kvproto/kvrpcpb.pb.h>

#include <fstream>
#include <streambuf>
Expand All @@ -14,15 +22,22 @@ struct ClusterConfig
std::string ca_path;
std::string cert_path;
std::string key_path;
::kvrpcpb::APIVersion api_version = ::kvrpcpb::APIVersion::V1;

ClusterConfig() = default;

ClusterConfig(const std::string & engine_key_, const std::string & engine_value_, const std::string & ca_path_, const std::string & cert_path_, const std::string & key_path_)
ClusterConfig(const std::string & engine_key_,
const std::string & engine_value_,
const std::string & ca_path_,
const std::string & cert_path_,
const std::string & key_path_,
const ::kvrpcpb::APIVersion & api_version_)
: tiflash_engine_key(engine_key_)
, tiflash_engine_value(engine_value_)
, ca_path(ca_path_)
, cert_path(cert_path_)
, key_path(key_path_)
, api_version(api_version_)
{}

bool hasTlsConfig() const { return !ca_path.empty(); }
Expand Down
4 changes: 3 additions & 1 deletion include/pingcap/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum ErrorCodes : int
CoprocessorError = 15,
TxnNotFound = 16,
NonAsyncCommit = 17,
UnknownError = 18
KeyspaceNotEnabled = 18,
InternalError = 19,
UnknownError = 20
};

class Exception : public Poco::Exception
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/coprocessor/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct CopTask
kv::GRPCMetaData meta_data;
// call before send request, can be used to collect TiFlash metrics.
std::function<void()> before_send;
pd::KeyspaceID keyspace_id;
};

struct RegionInfo
Expand Down Expand Up @@ -233,6 +234,7 @@ std::vector<CopTask> buildCopTasks(
KeyRanges ranges,
RequestPtr cop_req,
kv::StoreType store_type,
pd::KeyspaceID keyspace_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments about why we need keyspace_id in buildCopTasks but not need for buildBatchCopTasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Logger * log,
kv::GRPCMetaData meta_data = {},
std::function<void()> before_send = {});
Expand Down
3 changes: 3 additions & 0 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ struct Cluster

LockResolverPtr lock_resolver;

::kvrpcpb::APIVersion api_version = ::kvrpcpb::APIVersion::V1;

Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
Expand All @@ -36,6 +38,7 @@ struct Cluster
, rpc_client(std::make_unique<RpcClient>(config))
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
{}

void update(const std::vector<std::string> & pd_addrs, const ClusterConfig & config) const
Expand Down
2 changes: 1 addition & 1 deletion include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct RegionClient
throw Exception("Region epoch not match after retries: Region " + region_id.toString() + " not in region cache.", RegionEpochNotMatch);
}
const auto & store_addr = ctx->addr;
rpc.setCtx(ctx);
rpc.setCtx(ctx, cluster->api_version);
try
{
cluster->rpc_client->sendRequest(store_addr, rpc, timeout, meta_data);
Expand Down
19 changes: 18 additions & 1 deletion include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,26 @@ class RpcCall
, log(&Logger::get("pingcap.tikv"))
{}

void setCtx(RPCContextPtr rpc_ctx)
void setCtx(RPCContextPtr rpc_ctx, kvrpcpb::APIVersion api_version)
{
// TODO: attach the API version with a better manner.
// We check if the region range is in the keyspace, if it does, we use the API v2.
kvrpcpb::APIVersion req_api_ver = kvrpcpb::APIVersion::V1;
if (api_version == kvrpcpb::APIVersion::V2)
{
auto start_key = rpc_ctx->meta.start_key();
auto end_key = rpc_ctx->meta.end_key();
std::string min_raw_v2_key = {'r', 0, 0, 0};
std::string max_raw_v2_key = {'s', 0, 0, 0};
std::string min_txn_v2_key = {'x', 0, 0, 0};
std::string max_txn_v2_key = {'y', 0, 0, 0};
if ((start_key >= min_raw_v2_key && end_key < min_raw_v2_key) || (start_key >= min_txn_v2_key && end_key < max_txn_v2_key))
{
req_api_ver = kvrpcpb::APIVersion::V2;
}
}
::kvrpcpb::Context * context = req->mutable_context();
context->set_api_version(req_api_ver);
context->set_region_id(rpc_ctx->region.id);
context->set_allocated_region_epoch(new metapb::RegionEpoch(rpc_ctx->meta.region_epoch()));
context->set_allocated_peer(new metapb::Peer(rpc_ctx->peer));
Expand Down
6 changes: 6 additions & 0 deletions include/pingcap/pd/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <kvproto/keyspacepb.grpc.pb.h>
#include <kvproto/keyspacepb.pb.h>
#include <kvproto/pdpb.grpc.pb.h>
#include <pingcap/Config.h>
#include <pingcap/Log.h>
Expand Down Expand Up @@ -54,6 +56,8 @@ class Client : public IClient

uint64_t getGCSafePoint() override;

KeyspaceID getKeyspaceID(const std::string & keyspace_name) override;

bool isMock() override;

std::string getLeaderUrl() override;
Expand All @@ -75,6 +79,7 @@ class Client : public IClient
{
std::shared_ptr<grpc::Channel> channel;
std::unique_ptr<pdpb::PD::Stub> stub;
std::unique_ptr<keyspacepb::Keyspace::Stub> keyspace_stub;
PDConnClient(std::string addr, const ClusterConfig & config)
{
if (config.hasTlsConfig())
Expand All @@ -86,6 +91,7 @@ class Client : public IClient
channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
}
stub = pdpb::PD::NewStub(channel);
keyspace_stub = keyspacepb::Keyspace::NewStub(channel);
}
};

Expand Down
12 changes: 12 additions & 0 deletions include/pingcap/pd/IClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@
#include <kvproto/enginepb.pb.h>
#include <kvproto/pdpb.pb.h>
#pragma GCC diagnostic pop
#include <pingcap/Config.h>

namespace pingcap
{
namespace pd
{
using KeyspaceID = uint32_t;

enum : KeyspaceID
{
// The size of KeyspaceID allocated for PD is 3 bytes.
// The NullspaceID is preserved for TiDB API V1 compatibility.
NullspaceID = 0xffffffff,
};

class IClient
{
public:
Expand All @@ -36,6 +46,8 @@ class IClient

virtual uint64_t getGCSafePoint() = 0;

virtual KeyspaceID getKeyspaceID(const std::string & keyspace_name) = 0;

virtual void update(const std::vector<std::string> & addrs, const ClusterConfig & config_) = 0;

virtual bool isMock() = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/pd/MockPDClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class MockPDClient : public IClient

bool isClusterBootstrapped() override { return true; }

KeyspaceID getKeyspaceID(const std::string & /*keyspace_name*/) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

void update(const std::vector<std::string> & /*addrs*/, const ClusterConfig & /*config_*/) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

bool isMock() override { return true; }
Expand Down
17 changes: 13 additions & 4 deletions src/coprocessor/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ std::vector<CopTask> buildCopTasks(
KeyRanges ranges,
RequestPtr cop_req,
kv::StoreType store_type,
pd::KeyspaceID keyspace_id,
Logger * log,
kv::GRPCMetaData meta_data,
std::function<void()> before_send)
Expand All @@ -44,7 +45,7 @@ std::vector<CopTask> buildCopTasks(
// all ranges belong to same region.
if (i == ranges.size())
{
tasks.push_back(CopTask{loc.region, ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send});
tasks.push_back(CopTask{loc.region, ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send, keyspace_id});
break;
}

Expand All @@ -56,7 +57,7 @@ std::vector<CopTask> buildCopTasks(
task_ranges.push_back(KeyRange{bound.start_key, loc.end_key});
bound.start_key = loc.end_key; // update the last range start key after splitted
}
tasks.push_back(CopTask{loc.region, task_ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send});
tasks.push_back(CopTask{loc.region, task_ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send, keyspace_id});
ranges.erase(ranges.begin(), ranges.begin() + i);
}
log->debug("has " + std::to_string(tasks.size()) + " tasks.");
Expand Down Expand Up @@ -305,6 +306,8 @@ std::vector<BatchCopTask> balanceBatchCopTasks(kv::RegionCachePtr & cache, std::

// The elements in the two `physical_table_ids` and `ranges_for_each_physical_table` should be in one-to-one mapping.
// When build batch cop tasks for partition table, physical_table_ids.size() may be greater than 1.
// NOTE: `buildBatchCopTasks` do not need keyspace_id parameter yet since the batch tasks will be wrapped into MPP tasks
// and the keyspace_id attachment is finished before the MPP tasks are sent.
std::vector<BatchCopTask> buildBatchCopTasks(
kv::Backoffer & bo,
kv::Cluster * cluster,
Expand Down Expand Up @@ -462,6 +465,12 @@ std::vector<BatchCopTask> buildBatchCopTasks(
std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopTask & task)
{
auto req = std::make_shared<::coprocessor::Request>();
auto * ctx = req->mutable_context();
if (task.keyspace_id != pd::NullspaceID)
{
ctx->set_api_version(kvrpcpb::APIVersion::V2);
ctx->set_keyspace_id(task.keyspace_id);
}
req->set_tp(task.req->tp);
req->set_start_ts(task.req->start_ts);
req->set_schema_ver(task.req->schema_version);
Expand All @@ -488,7 +497,7 @@ std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT
catch (Exception & e)
{
bo.backoff(kv::boRegionMiss, e);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, log, task.meta_data, task.before_send);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, log, task.meta_data, task.before_send);
}
if (resp->has_locked())
{
Expand All @@ -506,7 +515,7 @@ std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT
log->information("get lock and sleep for a while, sleep time is " + std::to_string(before_expired) + "ms.");
bo.backoffWithMaxSleep(kv::boTxnLockFast, before_expired, Exception(resp->locked().DebugString(), ErrorCodes::LockError));
}
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, log, task.meta_data, task.before_send);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, log, task.meta_data, task.before_send);
}

const std::string & err_msg = resp->other_error();
Expand Down
37 changes: 37 additions & 0 deletions src/pd/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,43 @@ metapb::Store Client::getStore(uint64_t store_id)
return response.store();
}

KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name)
{
keyspacepb::LoadKeyspaceRequest request{};
keyspacepb::LoadKeyspaceResponse response{};

request.set_allocated_header(requestHeader());
request.set_name(keyspace_name);

grpc::ClientContext context;

context.set_deadline(std::chrono::system_clock::now() + pd_timeout);

auto status = leaderClient()->keyspace_stub->LoadKeyspace(&context, request, &response);
if (!status.ok())
{
std::string err_msg = ("get keyspace id failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
log->error(err_msg);
check_leader.store(true);
throw Exception(err_msg, GRPCErrorCode);
}

if (response.header().has_error())
{
std::string err_msg = ("get keyspace id failed: " + response.header().error().message());
log->error(err_msg);
throw Exception(err_msg, InternalError);
}

if (response.keyspace().state() != keyspacepb::KeyspaceState::ENABLED)
{
std::string err_msg = ("keyspace " + keyspace_name + " is not enabled");
log->error(err_msg);
throw Exception(err_msg, KeyspaceNotEnabled);
}
return response.keyspace().id();
}

bool Client::isClusterBootstrapped()
{
pdpb::IsBootstrappedRequest request{};
Expand Down
1 change: 1 addition & 0 deletions src/test/coprocessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ TEST_F(TestCoprocessor, testBuildTask1)
ranges,
req,
kv::StoreType::TiKV,
pd::NullspaceID,
&Logger::get("pingcap/coprocessor"));

ASSERT_EQ(tasks.size(), 2);
Expand Down