Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support keyspace requests #123

Merged
merged 13 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion include/pingcap/Config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#pragma once

#pragma GCC diagnostic push
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <grpcpp/security/credentials.h>
#pragma GCC diagnostic pop

#include <kvproto/kvrpcpb.pb.h>

#include <fstream>
#include <streambuf>
Expand All @@ -14,15 +22,22 @@ struct ClusterConfig
std::string ca_path;
std::string cert_path;
std::string key_path;
::kvrpcpb::APIVersion api_version = ::kvrpcpb::APIVersion::V1;

ClusterConfig() = default;

ClusterConfig(const std::string & engine_key_, const std::string & engine_value_, const std::string & ca_path_, const std::string & cert_path_, const std::string & key_path_)
ClusterConfig(const std::string & engine_key_,
const std::string & engine_value_,
const std::string & ca_path_,
const std::string & cert_path_,
const std::string & key_path_,
const ::kvrpcpb::APIVersion & api_version_)
: tiflash_engine_key(engine_key_)
, tiflash_engine_value(engine_value_)
, ca_path(ca_path_)
, cert_path(cert_path_)
, key_path(key_path_)
, api_version(api_version_)
{}

bool hasTlsConfig() const { return !ca_path.empty(); }
Expand Down
4 changes: 3 additions & 1 deletion include/pingcap/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum ErrorCodes : int
CoprocessorError = 15,
TxnNotFound = 16,
NonAsyncCommit = 17,
UnknownError = 18
KeyspaceNotEnabled = 18,
InternalError = 19,
UnknownError = 20
};

class Exception : public Poco::Exception
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/coprocessor/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct CopTask
kv::GRPCMetaData meta_data;
// call before send request, can be used to collect TiFlash metrics.
std::function<void()> before_send;
pd::KeyspaceID keyspace_id;
};

struct RegionInfo
Expand Down Expand Up @@ -233,6 +234,7 @@ std::vector<CopTask> buildCopTasks(
KeyRanges ranges,
RequestPtr cop_req,
kv::StoreType store_type,
pd::KeyspaceID keyspace_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments about why we need keyspace_id in buildCopTasks but not need for buildBatchCopTasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Logger * log,
kv::GRPCMetaData meta_data = {},
std::function<void()> before_send = {});
Expand Down
3 changes: 3 additions & 0 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ struct Cluster

LockResolverPtr lock_resolver;

::kvrpcpb::APIVersion api_version = ::kvrpcpb::APIVersion::V1;

Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
Expand All @@ -36,6 +38,7 @@ struct Cluster
, rpc_client(std::make_unique<RpcClient>(config))
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
{}

void update(const std::vector<std::string> & pd_addrs, const ClusterConfig & config) const
Expand Down
2 changes: 1 addition & 1 deletion include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct RegionClient
throw Exception("Region epoch not match after retries: Region " + region_id.toString() + " not in region cache.", RegionEpochNotMatch);
}
const auto & store_addr = ctx->addr;
rpc.setCtx(ctx);
rpc.setCtx(ctx, cluster->api_version);
try
{
cluster->rpc_client->sendRequest(store_addr, rpc, timeout, meta_data);
Expand Down
19 changes: 18 additions & 1 deletion include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,26 @@ class RpcCall
, log(&Logger::get("pingcap.tikv"))
{}

void setCtx(RPCContextPtr rpc_ctx)
void setCtx(RPCContextPtr rpc_ctx, kvrpcpb::APIVersion api_version)
{
// TODO: attach the API version with a better manner.
// We check if the region range is in the keyspace, if it does, we use the API v2.
kvrpcpb::APIVersion req_api_ver = kvrpcpb::APIVersion::V1;
if (api_version == kvrpcpb::APIVersion::V2)
{
auto start_key = rpc_ctx->meta.start_key();
auto end_key = rpc_ctx->meta.end_key();
std::string min_raw_v2_key = {'r', 0, 0, 0};
std::string max_raw_v2_key = {'s', 0, 0, 0};
std::string min_txn_v2_key = {'x', 0, 0, 0};
std::string max_txn_v2_key = {'y', 0, 0, 0};
if ((start_key >= min_raw_v2_key && end_key < min_raw_v2_key) || (start_key >= min_txn_v2_key && end_key < max_txn_v2_key))
{
req_api_ver = kvrpcpb::APIVersion::V2;
}
}
::kvrpcpb::Context * context = req->mutable_context();
context->set_api_version(req_api_ver);
context->set_region_id(rpc_ctx->region.id);
context->set_allocated_region_epoch(new metapb::RegionEpoch(rpc_ctx->meta.region_epoch()));
context->set_allocated_peer(new metapb::Peer(rpc_ctx->peer));
Expand Down
6 changes: 6 additions & 0 deletions include/pingcap/pd/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <kvproto/keyspacepb.grpc.pb.h>
#include <kvproto/keyspacepb.pb.h>
#include <kvproto/pdpb.grpc.pb.h>
#include <pingcap/Config.h>
#include <pingcap/Log.h>
Expand Down Expand Up @@ -54,6 +56,8 @@ class Client : public IClient

uint64_t getGCSafePoint() override;

KeyspaceID getKeyspaceID(const std::string & keyspace_name) override;

bool isMock() override;

std::string getLeaderUrl() override;
Expand All @@ -75,6 +79,7 @@ class Client : public IClient
{
std::shared_ptr<grpc::Channel> channel;
std::unique_ptr<pdpb::PD::Stub> stub;
std::unique_ptr<keyspacepb::Keyspace::Stub> keyspace_stub;
PDConnClient(std::string addr, const ClusterConfig & config)
{
if (config.hasTlsConfig())
Expand All @@ -86,6 +91,7 @@ class Client : public IClient
channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
}
stub = pdpb::PD::NewStub(channel);
keyspace_stub = keyspacepb::Keyspace::NewStub(channel);
}
};

Expand Down
12 changes: 12 additions & 0 deletions include/pingcap/pd/IClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@
#include <kvproto/enginepb.pb.h>
#include <kvproto/pdpb.pb.h>
#pragma GCC diagnostic pop
#include <pingcap/Config.h>

namespace pingcap
{
namespace pd
{
using KeyspaceID = uint32_t;

enum : KeyspaceID
{
// The size of KeyspaceID allocated for PD is 3 bytes.
// The NullspaceID is preserved for TiDB API V1 compatibility.
NullspaceID = 0xffffffff,
};

class IClient
{
public:
Expand All @@ -36,6 +46,8 @@ class IClient

virtual uint64_t getGCSafePoint() = 0;

virtual KeyspaceID getKeyspaceID(const std::string & keyspace_name) = 0;

virtual void update(const std::vector<std::string> & addrs, const ClusterConfig & config_) = 0;

virtual bool isMock() = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/pd/MockPDClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class MockPDClient : public IClient

bool isClusterBootstrapped() override { return true; }

KeyspaceID getKeyspaceID(const std::string & /*keyspace_name*/) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

void update(const std::vector<std::string> & /*addrs*/, const ClusterConfig & /*config_*/) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

bool isMock() override { return true; }
Expand Down
14 changes: 10 additions & 4 deletions src/coprocessor/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ std::vector<CopTask> buildCopTasks(
KeyRanges ranges,
RequestPtr cop_req,
kv::StoreType store_type,
pd::KeyspaceID keyspace_id,
Logger * log,
kv::GRPCMetaData meta_data,
std::function<void()> before_send)
Expand All @@ -44,7 +45,7 @@ std::vector<CopTask> buildCopTasks(
// all ranges belong to same region.
if (i == ranges.size())
{
tasks.push_back(CopTask{loc.region, ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send});
tasks.push_back(CopTask{loc.region, ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send, keyspace_id});
break;
}

Expand All @@ -56,7 +57,7 @@ std::vector<CopTask> buildCopTasks(
task_ranges.push_back(KeyRange{bound.start_key, loc.end_key});
bound.start_key = loc.end_key; // update the last range start key after splitted
}
tasks.push_back(CopTask{loc.region, task_ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send});
tasks.push_back(CopTask{loc.region, task_ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send, keyspace_id});
ranges.erase(ranges.begin(), ranges.begin() + i);
}
log->debug("has " + std::to_string(tasks.size()) + " tasks.");
Expand Down Expand Up @@ -462,6 +463,11 @@ std::vector<BatchCopTask> buildBatchCopTasks(
std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopTask & task)
{
auto req = std::make_shared<::coprocessor::Request>();
auto *ctx = req->mutable_context();
if (task.keyspace_id != pd::NullspaceID) {
ctx->set_api_version(kvrpcpb::APIVersion::V2);
ctx->set_keyspace_id(task.keyspace_id);
}
req->set_tp(task.req->tp);
req->set_start_ts(task.req->start_ts);
req->set_schema_ver(task.req->schema_version);
Expand All @@ -488,7 +494,7 @@ std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT
catch (Exception & e)
{
bo.backoff(kv::boRegionMiss, e);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, log, task.meta_data, task.before_send);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, log, task.meta_data, task.before_send);
}
if (resp->has_locked())
{
Expand All @@ -506,7 +512,7 @@ std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT
log->information("get lock and sleep for a while, sleep time is " + std::to_string(before_expired) + "ms.");
bo.backoffWithMaxSleep(kv::boTxnLockFast, before_expired, Exception(resp->locked().DebugString(), ErrorCodes::LockError));
}
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, log, task.meta_data, task.before_send);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, log, task.meta_data, task.before_send);
}

const std::string & err_msg = resp->other_error();
Expand Down
37 changes: 37 additions & 0 deletions src/pd/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,43 @@ metapb::Store Client::getStore(uint64_t store_id)
return response.store();
}

KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name)
{
keyspacepb::LoadKeyspaceRequest request{};
keyspacepb::LoadKeyspaceResponse response{};

request.set_allocated_header(requestHeader());
request.set_name(keyspace_name);

grpc::ClientContext context;

context.set_deadline(std::chrono::system_clock::now() + pd_timeout);

auto status = leaderClient()->keyspace_stub->LoadKeyspace(&context, request, &response);
if (!status.ok())
{
std::string err_msg = ("get keyspace id failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
log->error(err_msg);
check_leader.store(true);
throw Exception(err_msg, GRPCErrorCode);
}

if (response.header().has_error())
{
std::string err_msg = ("get keyspace id failed: " + response.header().error().message());
log->error(err_msg);
throw Exception(err_msg, InternalError);
}

if (response.keyspace().state() != keyspacepb::KeyspaceState::ENABLED)
{
std::string err_msg = ("keyspace " + keyspace_name + " is not enabled");
log->error(err_msg);
throw Exception(err_msg, KeyspaceNotEnabled);
}
return response.keyspace().id();
}

bool Client::isClusterBootstrapped()
{
pdpb::IsBootstrappedRequest request{};
Expand Down
1 change: 1 addition & 0 deletions src/test/coprocessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ TEST_F(TestCoprocessor, testBuildTask1)
ranges,
req,
kv::StoreType::TiKV,
pd::NullspaceID,
&Logger::get("pingcap/coprocessor"));

ASSERT_EQ(tasks.size(), 2);
Expand Down
2 changes: 1 addition & 1 deletion third_party/kvproto
Submodule kvproto updated 74 files
+8 −6 .github/workflows/cpp-test.yaml
+2 −0 .github/workflows/golang-test.yaml
+7 −1 .github/workflows/rust-test.yaml
+1 −3 .gitignore
+6 −11 Cargo.toml
+2 −17 README.md
+15 −7 cpp/CMakeLists.txt
+2 −0 cpp/cmake/find_abseil-cpp.cmake
+13 −7 go.mod
+71 −17 go.sum
+7 −1 include/eraftpb.proto
+1,378 −0 pkg/autoid/autoid.pb.go
+8,207 −4,126 pkg/brpb/brpb.pb.go
+1,283 −926 pkg/cdcpb/cdcpb.pb.go
+1,125 −801 pkg/configpb/configpb.pb.go
+2,218 −752 pkg/coprocessor/coprocessor.pb.go
+289 −228 pkg/deadlock/deadlock.pb.go
+1,684 −859 pkg/debugpb/debugpb.pb.go
+393 −298 pkg/diagnosticspb/diagnosticspb.pb.go
+6 −4 pkg/disk_usage/disk_usage.pb.go
+618 −530 pkg/encryptionpb/encryptionpb.pb.go
+597 −493 pkg/enginepb/enginepb.pb.go
+722 −506 pkg/eraftpb/eraftpb.pb.go
+1,852 −580 pkg/errorpb/errorpb.pb.go
+3,553 −0 pkg/gcpb/gcpb.pb.go
+970 −665 pkg/import_kvpb/import_kvpb.pb.go
+5,610 −3,042 pkg/import_sstpb/import_sstpb.pb.go
+2,321 −0 pkg/keyspacepb/keyspacepb.pb.go
+17,441 −9,094 pkg/kvrpcpb/kvrpcpb.pb.go
+1,830 −0 pkg/logbackuppb/logbackuppb.pb.go
+1,785 −338 pkg/metapb/metapb.pb.go
+1,033 −401 pkg/mpp/mpp.pb.go
+17,941 −10,610 pkg/pdpb/pdpb.pb.go
+5,356 −2,558 pkg/raft_cmdpb/raft_cmdpb.pb.go
+3,297 −1,175 pkg/raft_serverpb/raft_serverpb.pb.go
+2,589 −0 pkg/recoverdatapb/recoverdatapb.pb.go
+544 −166 pkg/replication_modepb/replication_modepb.pb.go
+6,638 −0 pkg/resource_manager/resource_manager.pb.go
+259 −249 pkg/resource_usage_agent/resource_usage_agent.pb.go
+2,615 −2,222 pkg/tikvpb/tikvpb.pb.go
+0 −1,711 pkg/trace/trace.pb.go
+2,209 −0 pkg/tracepb/tracepb.pb.go
+1,669 −0 pkg/tsopb/tsopb.pb.go
+46 −0 proto/autoid.proto
+184 −1 proto/brpb.proto
+18 −0 proto/cdcpb.proto
+34 −0 proto/coprocessor.proto
+11 −0 proto/debugpb.proto
+1 −0 proto/encryptionpb.proto
+38 −0 proto/errorpb.proto
+117 −0 proto/gcpb.proto
+122 −1 proto/import_sstpb.proto
+69 −0 proto/keyspacepb.proto
+327 −39 proto/kvrpcpb.proto
+58 −0 proto/logbackuppb.proto
+55 −0 proto/metapb.proto
+16 −4 proto/mpp.proto
+176 −7 proto/pdpb.proto
+43 −0 proto/raft_cmdpb.proto
+77 −1 proto/raft_serverpb.proto
+82 −0 proto/recoverdatapb.proto
+13 −2 proto/replication_modepb.proto
+181 −0 proto/resource_manager.proto
+16 −0 proto/tikvpb.proto
+0 −52 proto/trace.proto
+60 −0 proto/tracepb.proto
+68 −0 proto/tsopb.proto
+24 −3 scripts/check.sh
+4 −3 scripts/common.sh
+0 −1 scripts/docker-run.sh
+7 −6 scripts/generate_go.sh
+17,283 −0 scripts/proto.lock
+21 −7 src/lib.rs
+1 −1 tools.json