Skip to content

Commit

Permalink
Support keyspace requests (#123)
Browse files Browse the repository at this point in the history
* support keyspace requests

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* remove duplicated function sig

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* remove static api version config from Cluster

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* remove api version config

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* check api_version by region range

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* introduce keyspace id for CopTask

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix unused paramenter error

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* add some comments for the missing keyspace_id param in buildBatchCopTasks

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

---------

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
  • Loading branch information
iosmanthus authored Mar 10, 2023
1 parent 7258c0a commit 04d4081
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 8 deletions.
17 changes: 16 additions & 1 deletion include/pingcap/Config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#pragma once

#pragma GCC diagnostic push
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <grpcpp/security/credentials.h>
#pragma GCC diagnostic pop

#include <kvproto/kvrpcpb.pb.h>

#include <fstream>
#include <streambuf>
Expand All @@ -14,15 +22,22 @@ struct ClusterConfig
std::string ca_path;
std::string cert_path;
std::string key_path;
::kvrpcpb::APIVersion api_version = ::kvrpcpb::APIVersion::V1;

ClusterConfig() = default;

ClusterConfig(const std::string & engine_key_, const std::string & engine_value_, const std::string & ca_path_, const std::string & cert_path_, const std::string & key_path_)
ClusterConfig(const std::string & engine_key_,
const std::string & engine_value_,
const std::string & ca_path_,
const std::string & cert_path_,
const std::string & key_path_,
const ::kvrpcpb::APIVersion & api_version_)
: tiflash_engine_key(engine_key_)
, tiflash_engine_value(engine_value_)
, ca_path(ca_path_)
, cert_path(cert_path_)
, key_path(key_path_)
, api_version(api_version_)
{}

bool hasTlsConfig() const { return !ca_path.empty(); }
Expand Down
4 changes: 3 additions & 1 deletion include/pingcap/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum ErrorCodes : int
CoprocessorError = 15,
TxnNotFound = 16,
NonAsyncCommit = 17,
UnknownError = 18
KeyspaceNotEnabled = 18,
InternalError = 19,
UnknownError = 20
};

class Exception : public Poco::Exception
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/coprocessor/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct CopTask
kv::GRPCMetaData meta_data;
// call before send request, can be used to collect TiFlash metrics.
std::function<void()> before_send;
pd::KeyspaceID keyspace_id;
};

struct RegionInfo
Expand Down Expand Up @@ -233,6 +234,7 @@ std::vector<CopTask> buildCopTasks(
KeyRanges ranges,
RequestPtr cop_req,
kv::StoreType store_type,
pd::KeyspaceID keyspace_id,
Logger * log,
kv::GRPCMetaData meta_data = {},
std::function<void()> before_send = {});
Expand Down
3 changes: 3 additions & 0 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ struct Cluster

LockResolverPtr lock_resolver;

::kvrpcpb::APIVersion api_version = ::kvrpcpb::APIVersion::V1;

Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
Expand All @@ -36,6 +38,7 @@ struct Cluster
, rpc_client(std::make_unique<RpcClient>(config))
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
{}

void update(const std::vector<std::string> & pd_addrs, const ClusterConfig & config) const
Expand Down
2 changes: 1 addition & 1 deletion include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct RegionClient
throw Exception("Region epoch not match after retries: Region " + region_id.toString() + " not in region cache.", RegionEpochNotMatch);
}
const auto & store_addr = ctx->addr;
rpc.setCtx(ctx);
rpc.setCtx(ctx, cluster->api_version);
try
{
cluster->rpc_client->sendRequest(store_addr, rpc, timeout, meta_data);
Expand Down
19 changes: 18 additions & 1 deletion include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,26 @@ class RpcCall
, log(&Logger::get("pingcap.tikv"))
{}

void setCtx(RPCContextPtr rpc_ctx)
void setCtx(RPCContextPtr rpc_ctx, kvrpcpb::APIVersion api_version)
{
// TODO: attach the API version with a better manner.
// We check if the region range is in the keyspace, if it does, we use the API v2.
kvrpcpb::APIVersion req_api_ver = kvrpcpb::APIVersion::V1;
if (api_version == kvrpcpb::APIVersion::V2)
{
auto start_key = rpc_ctx->meta.start_key();
auto end_key = rpc_ctx->meta.end_key();
std::string min_raw_v2_key = {'r', 0, 0, 0};
std::string max_raw_v2_key = {'s', 0, 0, 0};
std::string min_txn_v2_key = {'x', 0, 0, 0};
std::string max_txn_v2_key = {'y', 0, 0, 0};
if ((start_key >= min_raw_v2_key && end_key < min_raw_v2_key) || (start_key >= min_txn_v2_key && end_key < max_txn_v2_key))
{
req_api_ver = kvrpcpb::APIVersion::V2;
}
}
::kvrpcpb::Context * context = req->mutable_context();
context->set_api_version(req_api_ver);
context->set_region_id(rpc_ctx->region.id);
context->set_allocated_region_epoch(new metapb::RegionEpoch(rpc_ctx->meta.region_epoch()));
context->set_allocated_peer(new metapb::Peer(rpc_ctx->peer));
Expand Down
6 changes: 6 additions & 0 deletions include/pingcap/pd/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <kvproto/keyspacepb.grpc.pb.h>
#include <kvproto/keyspacepb.pb.h>
#include <kvproto/pdpb.grpc.pb.h>
#include <pingcap/Config.h>
#include <pingcap/Log.h>
Expand Down Expand Up @@ -54,6 +56,8 @@ class Client : public IClient

uint64_t getGCSafePoint() override;

KeyspaceID getKeyspaceID(const std::string & keyspace_name) override;

bool isMock() override;

std::string getLeaderUrl() override;
Expand All @@ -75,6 +79,7 @@ class Client : public IClient
{
std::shared_ptr<grpc::Channel> channel;
std::unique_ptr<pdpb::PD::Stub> stub;
std::unique_ptr<keyspacepb::Keyspace::Stub> keyspace_stub;
PDConnClient(std::string addr, const ClusterConfig & config)
{
if (config.hasTlsConfig())
Expand All @@ -86,6 +91,7 @@ class Client : public IClient
channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
}
stub = pdpb::PD::NewStub(channel);
keyspace_stub = keyspacepb::Keyspace::NewStub(channel);
}
};

Expand Down
12 changes: 12 additions & 0 deletions include/pingcap/pd/IClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@
#include <kvproto/enginepb.pb.h>
#include <kvproto/pdpb.pb.h>
#pragma GCC diagnostic pop
#include <pingcap/Config.h>

namespace pingcap
{
namespace pd
{
using KeyspaceID = uint32_t;

enum : KeyspaceID
{
// The size of KeyspaceID allocated for PD is 3 bytes.
// The NullspaceID is preserved for TiDB API V1 compatibility.
NullspaceID = 0xffffffff,
};

class IClient
{
public:
Expand All @@ -36,6 +46,8 @@ class IClient

virtual uint64_t getGCSafePoint() = 0;

virtual KeyspaceID getKeyspaceID(const std::string & keyspace_name) = 0;

virtual void update(const std::vector<std::string> & addrs, const ClusterConfig & config_) = 0;

virtual bool isMock() = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/pd/MockPDClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class MockPDClient : public IClient

bool isClusterBootstrapped() override { return true; }

KeyspaceID getKeyspaceID(const std::string & /*keyspace_name*/) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

void update(const std::vector<std::string> & /*addrs*/, const ClusterConfig & /*config_*/) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

bool isMock() override { return true; }
Expand Down
17 changes: 13 additions & 4 deletions src/coprocessor/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ std::vector<CopTask> buildCopTasks(
KeyRanges ranges,
RequestPtr cop_req,
kv::StoreType store_type,
pd::KeyspaceID keyspace_id,
Logger * log,
kv::GRPCMetaData meta_data,
std::function<void()> before_send)
Expand All @@ -44,7 +45,7 @@ std::vector<CopTask> buildCopTasks(
// all ranges belong to same region.
if (i == ranges.size())
{
tasks.push_back(CopTask{loc.region, ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send});
tasks.push_back(CopTask{loc.region, ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send, keyspace_id});
break;
}

Expand All @@ -56,7 +57,7 @@ std::vector<CopTask> buildCopTasks(
task_ranges.push_back(KeyRange{bound.start_key, loc.end_key});
bound.start_key = loc.end_key; // update the last range start key after splitted
}
tasks.push_back(CopTask{loc.region, task_ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send});
tasks.push_back(CopTask{loc.region, task_ranges, cop_req, store_type, /*partition_index=*/0, meta_data, before_send, keyspace_id});
ranges.erase(ranges.begin(), ranges.begin() + i);
}
log->debug("has " + std::to_string(tasks.size()) + " tasks.");
Expand Down Expand Up @@ -305,6 +306,8 @@ std::vector<BatchCopTask> balanceBatchCopTasks(kv::RegionCachePtr & cache, std::

// The elements in the two `physical_table_ids` and `ranges_for_each_physical_table` should be in one-to-one mapping.
// When build batch cop tasks for partition table, physical_table_ids.size() may be greater than 1.
// NOTE: `buildBatchCopTasks` do not need keyspace_id parameter yet since the batch tasks will be wrapped into MPP tasks
// and the keyspace_id attachment is finished before the MPP tasks are sent.
std::vector<BatchCopTask> buildBatchCopTasks(
kv::Backoffer & bo,
kv::Cluster * cluster,
Expand Down Expand Up @@ -462,6 +465,12 @@ std::vector<BatchCopTask> buildBatchCopTasks(
std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopTask & task)
{
auto req = std::make_shared<::coprocessor::Request>();
auto * ctx = req->mutable_context();
if (task.keyspace_id != pd::NullspaceID)
{
ctx->set_api_version(kvrpcpb::APIVersion::V2);
ctx->set_keyspace_id(task.keyspace_id);
}
req->set_tp(task.req->tp);
req->set_start_ts(task.req->start_ts);
req->set_schema_ver(task.req->schema_version);
Expand All @@ -488,7 +497,7 @@ std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT
catch (Exception & e)
{
bo.backoff(kv::boRegionMiss, e);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, log, task.meta_data, task.before_send);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, log, task.meta_data, task.before_send);
}
if (resp->has_locked())
{
Expand All @@ -506,7 +515,7 @@ std::vector<CopTask> ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT
log->information("get lock and sleep for a while, sleep time is " + std::to_string(before_expired) + "ms.");
bo.backoffWithMaxSleep(kv::boTxnLockFast, before_expired, Exception(resp->locked().DebugString(), ErrorCodes::LockError));
}
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, log, task.meta_data, task.before_send);
return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, log, task.meta_data, task.before_send);
}

const std::string & err_msg = resp->other_error();
Expand Down
37 changes: 37 additions & 0 deletions src/pd/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,43 @@ metapb::Store Client::getStore(uint64_t store_id)
return response.store();
}

KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name)
{
keyspacepb::LoadKeyspaceRequest request{};
keyspacepb::LoadKeyspaceResponse response{};

request.set_allocated_header(requestHeader());
request.set_name(keyspace_name);

grpc::ClientContext context;

context.set_deadline(std::chrono::system_clock::now() + pd_timeout);

auto status = leaderClient()->keyspace_stub->LoadKeyspace(&context, request, &response);
if (!status.ok())
{
std::string err_msg = ("get keyspace id failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
log->error(err_msg);
check_leader.store(true);
throw Exception(err_msg, GRPCErrorCode);
}

if (response.header().has_error())
{
std::string err_msg = ("get keyspace id failed: " + response.header().error().message());
log->error(err_msg);
throw Exception(err_msg, InternalError);
}

if (response.keyspace().state() != keyspacepb::KeyspaceState::ENABLED)
{
std::string err_msg = ("keyspace " + keyspace_name + " is not enabled");
log->error(err_msg);
throw Exception(err_msg, KeyspaceNotEnabled);
}
return response.keyspace().id();
}

bool Client::isClusterBootstrapped()
{
pdpb::IsBootstrappedRequest request{};
Expand Down
1 change: 1 addition & 0 deletions src/test/coprocessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ TEST_F(TestCoprocessor, testBuildTask1)
ranges,
req,
kv::StoreType::TiKV,
pd::NullspaceID,
&Logger::get("pingcap/coprocessor"));

ASSERT_EQ(tasks.size(), 2);
Expand Down

0 comments on commit 04d4081

Please sign in to comment.