Skip to content

Commit

Permalink
integrate read_ru into tiflash resource control (#8097)
Browse files Browse the repository at this point in the history
close #8098
  • Loading branch information
guo-shaoge authored Sep 20, 2023
1 parent 45f9bb1 commit 0c2286c
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 38 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,13 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_

RU DAGContext::getReadRU() const
{
double ru = 0.0;
UInt64 read_bytes = 0;
for (const auto & [id, sc] : scan_context_map)
{
(void)id; // Disable unused variable warnning.
ru += sc->getReadRU();
read_bytes += sc->total_user_read_bytes;
}
return ru;
return bytesToRU(read_bytes);
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ class DAGContext

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getResourceGroupName() { return resource_group_name; }
void enableResourceControl() { enable_resource_control = true; }
bool isResourceControlEnabled() const { return enable_resource_control; }

RU getReadRU() const;

Expand Down Expand Up @@ -450,6 +452,7 @@ class DAGContext
const KeyspaceID keyspace_id = NullspaceID;

const String resource_group_name;
bool enable_resource_control = false;

// Used to determine the execution mode
// - None: request has not been executed yet
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ void DAGStorageInterpreter::prepare()

// Do learner read
DAGContext & dag_context = *context.getDAGContext();
auto scan_context = std::make_shared<DM::ScanContext>();
auto scan_context
= std::make_shared<DM::ScanContext>(dag_context.getResourceGroupName(), dag_context.isResourceControlEnabled());
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Executor/DataStreamExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ RU DataStreamExecutor::collectRequestUnit()
// When the number of threads is greater than the number of cpu cores,
// BlockInputStream's estimated cpu time will be much greater than the actual value.
if (execute_time_ns <= 0 || total_thread_cnt <= logical_cpu_cores)
return toRU(execute_time_ns);
return cpuTimeToRU(execute_time_ns);

// Here we use `execute_time_ns / thread_cnt` to get the average execute time of each thread.
// So we have `per_thread_execute_time_ns = execute_time_ns / estimate_thread_cnt`.
Expand All @@ -115,7 +115,7 @@ RU DataStreamExecutor::collectRequestUnit()
// We can assume `condition.wait` takes half of datastream execute time.
// TODO find a more reasonable ratio for `condition.wait`.
cpu_time_ns /= 2;
return toRU(ceil(cpu_time_ns));
return cpuTimeToRU(static_cast<UInt64>(ceil(cpu_time_ns)));
}

Block DataStreamExecutor::getSampleBlock() const
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler)
wait();
}
LOG_TRACE(log, "query finish with {}", exec_context.getQueryProfileInfo().toJson());

return exec_context.toExecutionResult();
}

Expand Down Expand Up @@ -140,7 +139,7 @@ RU PipelineExecutor::collectRequestUnit()
// It may be necessary to obtain CPU time using a more accurate method, such as using system call `clock_gettime`.
const auto & query_profile_info = exec_context.getQueryProfileInfo();
auto cpu_time_ns = query_profile_info.getCPUExecuteTimeNs();
return toRU(cpu_time_ns);
return cpuTimeToRU(cpu_time_ns);
}

Block PipelineExecutor::getSampleBlock() const
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Executor/tests/gtest_to_ru.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ class TestToRU : public ::testing::Test

TEST_F(TestToRU, base)
{
ASSERT_EQ(0, toRU(0));
ASSERT_EQ(0, cpuTimeToRU(0));

auto base_ru = toRU(1);
auto base_ru = cpuTimeToRU(1);
ASSERT_TRUE(base_ru > 0);

for (size_t i = 1; i < 10; ++i)
{
auto ru = toRU(i);
auto ru = cpuTimeToRU(i);
ASSERT_TRUE(ru >= base_ru);
base_ru = ru;
}

constexpr auto ten_ms = 10'000'000;
for (size_t i = 1; i < 20; ++i)
{
auto ru = toRU(i * ten_ms);
auto ru = cpuTimeToRU(i * ten_ms);
ASSERT_TRUE(ru > base_ru);
base_ru = ru;
}
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Flash/Executor/toRU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ UInt64 toCPUTimeMillisecond(UInt64 cpu_time_ns)
}

// 1 ru = 3 millisecond cpu time
RU toRU(UInt64 cpu_time_ns)
RU cpuTimeToRU(UInt64 cpu_time_ns)
{
if (unlikely(cpu_time_ns == 0))
return 0;

auto cpu_time_millisecond = toCPUTimeMillisecond(cpu_time_ns);
return static_cast<double>(cpu_time_millisecond) / 3;
}

// 1ru = 64KB
RU bytesToRU(UInt64 bytes)
{
return static_cast<double>(bytes) / 1024.0 / 64.0;
}
} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/Executor/toRU.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ using RU = double;
UInt64 toCPUTimeMillisecond(UInt64 cpu_time_ns);

// Convert cpu time nanoseconds to Request Unit.
RU toRU(UInt64 cpu_time_ns);
RU cpuTimeToRU(UInt64 cpu_time_ns);
RU bytesToRU(UInt64 bytes);
} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,18 +302,18 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request, const Settings
case tipb::ExecType::TypeJoin:
return true;
default:
if (settings.enforce_enable_pipeline)
if (settings.enforce_enable_resource_control)
throw Exception(fmt::format(
"Pipeline mode does not support {}, and an error is reported because the setting "
"enforce_enable_pipeline is true.",
"enforce_enable_resource_control is true.",
magic_enum::enum_name(executor.tp())));
is_supported = false;
return false;
}
});
if (settings.enforce_enable_pipeline && !is_supported)
if (settings.enforce_enable_resource_control && !is_supported)
throw Exception("There is an unsupported operator in pipeline model, and an error is reported because the "
"setting enforce_enable_pipeline is true.");
"setting enforce_enable_resource_control is true.");
return is_supported;
}
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ template <typename NestedTaskQueueType>
void ResourceControlQueue<NestedTaskQueueType>::updateStatistics(const TaskPtr & task, ExecTaskStatus, UInt64 inc_value)
{
assert(task);
auto ru = toRU(inc_value);
auto ru = cpuTimeToRU(inc_value);
const String & name = task->getResourceGroupName();
LOG_TRACE(logger, "resource group {} will consume {} RU(or {} cpu time in ns)", name, ru, inc_value);
LocalAdmissionController::global_instance->consumeResource(name, ru, inc_value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TaskQueuePtr CPUImpl::newTaskQueue(TaskQueueType type)
{
switch (type)
{
// the default queue is RC_MLFQ.
// the default queue is RCQ_MLFQ.
case TaskQueueType::DEFAULT:
case TaskQueueType::RCQ_MLFQ:
return std::make_unique<ResourceControlQueue<CPUMultiLevelFeedbackQueue>>();
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Flash/ResourceControl/MockLocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ inline uint64_t nopGetPriority(const std::string &)
class MockLocalAdmissionController final : private boost::noncopyable
{
public:
static constexpr uint64_t HIGHEST_RESOURCE_GROUP_PRIORITY = 0;

MockLocalAdmissionController()
: consume_resource_func(nopConsumeResource)
, get_priority_func(nopGetPriority)
Expand All @@ -50,9 +52,18 @@ class MockLocalAdmissionController final : private boost::noncopyable

void consumeResource(const std::string & name, double ru, uint64_t cpu_time_ns) const
{
if (name.empty())
return;

consume_resource_func(name, ru, cpu_time_ns);
}
std::optional<uint64_t> getPriority(const std::string & name) const { return {get_priority_func(name)}; }
std::optional<uint64_t> getPriority(const std::string & name) const
{
if (name.empty())
return {HIGHEST_RESOURCE_GROUP_PRIORITY};

return {get_priority_func(name)};
}
void warmupResourceGroupInfoCache(const std::string &) {}

void registerRefillTokenCallback(const std::function<void()> & cb)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ std::optional<QueryExecutorPtr> executeAsPipeline(Context & context, bool intern
if (likely(!internal))
LOG_INFO(logger, fmt::format("Query pipeline:\n{}", executor->toString()));
dag_context.switchToPipelineMode();
dag_context.enableResourceControl();
return {std::move(executor)};
}

Expand All @@ -219,7 +220,7 @@ QueryExecutorPtr executeAsBlockIO(Context & context, bool internal)

QueryExecutorPtr queryExecute(Context & context, bool internal)
{
if (context.getSettingsRef().enforce_enable_pipeline)
if (context.getSettingsRef().enforce_enable_resource_control)
{
RUNTIME_CHECK_MSG(
TaskScheduler::instance,
Expand All @@ -229,10 +230,10 @@ QueryExecutorPtr queryExecute(Context & context, bool internal)
RUNTIME_CHECK_MSG(
res,
"Failed to execute query using pipeline model, and an error is reported because the setting "
"enforce_enable_pipeline is true.");
"enforce_enable_resource_control is true.");
return std::move(*res);
}
if (context.getSettingsRef().enable_planner && context.getSettingsRef().enable_pipeline)
if (context.getSettingsRef().enable_planner && context.getSettingsRef().enable_resource_control)
{
if (auto res = executeAsPipeline(context, internal); res)
return std::move(*res);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ struct Settings
M(SettingUInt64, max_spilled_rows_per_file, 200000, "Max spilled data rows per spill file, 200000 as the default value, 0 means no limit.") \
M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \
M(SettingBool, enable_planner, true, "Enable planner") \
M(SettingBool, enable_pipeline, true, "Enable pipeline model") \
M(SettingBool, enforce_enable_pipeline, false, "Enforce the enablement of the pipeline model") \
M(SettingBool, enable_resource_control, true, "Enable resource control") \
M(SettingBool, enforce_enable_resource_control, false, "Enforce the enablement of the resource control") \
M(SettingUInt64, pipeline_cpu_task_thread_pool_size, 0, "The size of cpu task thread pool. 0 means using number_of_logical_cpu_cores.") \
M(SettingUInt64, pipeline_io_task_thread_pool_size, 0, "The size of io task thread pool. 0 means using number_of_logical_cpu_cores.") \
M(SettingTaskQueueType, pipeline_cpu_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of cpu task thread pool") \
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h>
Expand Down Expand Up @@ -186,7 +187,14 @@ size_t ColumnFileSetReader::readRows(
}
for (const auto & col : output_columns)
{
context.scan_context->total_user_read_bytes += col->byteSize();
const auto delta_bytes = col->byteSize();
context.scan_context->total_user_read_bytes += delta_bytes;

if (context.scan_context->enable_resource_control)
LocalAdmissionController::global_instance->consumeResource(
context.scan_context->resource_group_name,
bytesToRU(delta_bytes),
0);
}
return actual_read;
}
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ class ScanContext
std::atomic<uint64_t> total_disagg_read_cache_miss_size{0};


ScanContext() = default;
explicit ScanContext(const String & name = "", bool enable_resource_control_ = false)
: resource_group_name(name)
, enable_resource_control(enable_resource_control_)
{}

void deserialize(const tipb::TiFlashScanContext & tiflash_scan_context_pb)
{
Expand Down Expand Up @@ -130,11 +133,10 @@ class ScanContext
total_disagg_read_cache_miss_size += other.total_disagg_read_cache_miss_size();
}

// Reference: https://docs.pingcap.com/tidb/dev/tidb-resource-control
// For Read I/O, 1/64 RU per KB.
double getReadRU() const { return static_cast<double>(total_user_read_bytes) / 1024.0 / 64.0; }
const String resource_group_name;
const bool enable_resource_control;
};

using ScanContextPtr = std::shared_ptr<ScanContext>;

} // namespace DB::DM
} // namespace DB::DM
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>

Expand Down Expand Up @@ -218,6 +219,12 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
if (likely(scan_context != nullptr))
{
scan_context->total_user_read_bytes += bytes;

if (scan_context->enable_resource_control)
LocalAdmissionController::global_instance->consumeResource(
scan_context->resource_group_name,
bytesToRU(bytes),
0);
}
}
BlockInputStreams::iterator current_stream;
Expand All @@ -227,4 +234,4 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
};

} // namespace DM
} // namespace DB
} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/TestUtils/ExecutorTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ void ExecutorTest::enablePlanner(bool is_enable) const

void ExecutorTest::enablePipeline(bool is_enable) const
{
context.context->setSetting("enable_pipeline", is_enable ? "true" : "false");
context.context->setSetting("enforce_enable_pipeline", is_enable ? "true" : "false");
context.context->setSetting("enable_resource_control", is_enable ? "true" : "false");
context.context->setSetting("enforce_enable_resource_control", is_enable ? "true" : "false");
}

// ywq todo rename
Expand Down Expand Up @@ -376,8 +376,8 @@ void ExecutorTest::testForExecutionSummary(
statistics_collector.initialize(&dag_context);
auto summaries = statistics_collector.genExecutionSummaryResponse().execution_summaries();
bool enable_planner = context.context->getSettingsRef().enable_planner;
bool enable_pipeline = context.context->getSettingsRef().enable_pipeline
|| context.context->getSettingsRef().enforce_enable_pipeline;
bool enable_pipeline = context.context->getSettingsRef().enable_resource_control
|| context.context->getSettingsRef().enforce_enable_resource_control;
ASSERT_EQ(summaries.size(), expect.size())
<< "\n"
<< testInfoMsg(request, enable_planner, enable_pipeline, concurrency, DEFAULT_BLOCK_SIZE);
Expand Down
2 changes: 1 addition & 1 deletion tests/docker/config/tics_dt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ level = "trace"

[profiles]
[profiles.default]
enable_pipeline = 0
enable_resource_control = 0

0 comments on commit 0c2286c

Please sign in to comment.