Skip to content

Commit

Permalink
Added log settings flags
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jan 6, 2025
1 parent df2c912 commit aa4e3d5
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 40 deletions.
94 changes: 80 additions & 14 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "src/kqp_runner.h"

#include <cstdio>

#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>

#include <library/cpp/colorizer/colors.h>
Expand All @@ -15,11 +13,13 @@

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/yaml_config/yaml_config.h>

#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/public/udf/udf_static_registry.h>

#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.h>
#include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
#include <yql/essentials/public/udf/udf_static_registry.h>


namespace NKqpRun {
Expand Down Expand Up @@ -135,7 +135,7 @@ struct TExecutionOptions {
void ValidateOptionsSizes(const TRunnerOptions& runnerOptions) const {
const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) {
if (checkSize > numberQueries) {
ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries;
ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of script queries is " << numberQueries;
}
};

Expand Down Expand Up @@ -431,6 +431,9 @@ class TMain : public TMainClassArgs {
bool ExcludeLinkedUdfs = false;
bool EmulateYt = false;

std::optional<NActors::NLog::EPriority> DefaultLogPriority;
std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> LogPriorities;

static TString LoadFile(const TString& file) {
return TFileInput(file).ReadAll();
}
Expand Down Expand Up @@ -466,6 +469,10 @@ class TMain : public TMainClassArgs {
return choices;
}

bool Contains(const TString& choice) const {
return ChoicesMap.contains(choice);
}

private:
const std::map<TString, TResult> ChoicesMap;
};
Expand All @@ -483,11 +490,13 @@ class TMain : public TMainClassArgs {
.Handler1([this](const NLastGetopt::TOptsParser* option) {
ExecutionOptions.SchemeQuery = LoadFile(option->CurVal());
});

options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)")
.RequiredArgument("file")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal()));
});

options.AddLongOption("templates", "Enable templates for -s and -p queries, such as ${YQL_TOKEN} and ${QUERY_ID}")
.NoArgument()
.SetFlag(&ExecutionOptions.UseTemplates);
Expand All @@ -499,10 +508,10 @@ class TMain : public TMainClassArgs {
TStringBuf filePath;
TStringBuf(option->CurVal()).Split('@', tableName, filePath);
if (tableName.empty() || filePath.empty()) {
ythrow yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.Root/plato.Input@input.txt";
ythrow yexception() << "Incorrect table mapping, expected form table@file, e. g. yt.Root/plato.Input@input.txt";
}
if (TablesMapping.contains(tableName)) {
ythrow yexception() << "Got duplicate table name: " << tableName;
ythrow yexception() << "Got duplicated table name: " << tableName;
}
TablesMapping[tableName] = filePath;
});
Expand All @@ -523,9 +532,11 @@ class TMain : public TMainClassArgs {
options.AddLongOption('u', "udf", "Load shared library with UDF by given path")
.RequiredArgument("file")
.EmplaceTo(&UdfsPaths);

options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory")
.RequiredArgument("directory")
.StoreResult(&UdfsDirectory);

options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir")
.NoArgument()
.SetFlag(&ExcludeLinkedUdfs);
Expand All @@ -540,13 +551,51 @@ class TMain : public TMainClassArgs {
std::remove(file.c_str());
}
});

TChoices<NActors::NLog::EPriority> logPriority({
{"emerg", NActors::NLog::EPriority::PRI_EMERG},
{"alert", NActors::NLog::EPriority::PRI_ALERT},
{"crit", NActors::NLog::EPriority::PRI_CRIT},
{"error", NActors::NLog::EPriority::PRI_ERROR},
{"warn", NActors::NLog::EPriority::PRI_WARN},
{"notice", NActors::NLog::EPriority::PRI_NOTICE},
{"info", NActors::NLog::EPriority::PRI_INFO},
{"debug", NActors::NLog::EPriority::PRI_DEBUG},
{"trace", NActors::NLog::EPriority::PRI_TRACE},
});
options.AddLongOption("log-default", "Default log priority")
.RequiredArgument("priority")
.Choices(logPriority.GetChoices())
.StoreMappedResultT<TString>(&DefaultLogPriority, logPriority);

options.AddLongOption("log", "Component log priority in format <component>=<priority> (e. g. KQP_YQL=trace)")
.RequiredArgument("component priority")
.Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) {
TStringBuf component;
TStringBuf priority;
TStringBuf(option->CurVal()).Split('=', component, priority);
if (component.empty() || priority.empty()) {
ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace";
}

const auto service = GetLogService(TString(component));
if (LogPriorities.contains(service)) {
ythrow yexception() << "Got duplicated log service name: " << component;
}

if (!logPriority.Contains(TString(priority))) {
ythrow yexception() << "Incorrect log priority: " << priority;
}
LogPriorities[service] = logPriority(TString(priority));
});

TChoices<TRunnerOptions::ETraceOptType> traceOpt({
{"all", TRunnerOptions::ETraceOptType::All},
{"scheme", TRunnerOptions::ETraceOptType::Scheme},
{"script", TRunnerOptions::ETraceOptType::Script},
{"disabled", TRunnerOptions::ETraceOptType::Disabled}
});
options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation (use script@<query id> for tracing one -p query)")
options.AddLongOption('T', "trace-opt", "Print AST in the begin of each transformation (use script@<query id> for tracing one -p query)")
.RequiredArgument("trace-opt-query")
.DefaultValue("disabled")
.Choices(traceOpt.GetChoices())
Expand All @@ -555,9 +604,11 @@ class TMain : public TMainClassArgs {
RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled;
return traceOptType;
});
options.AddLongOption('I', "trace-opt-index", "index of -p query to use --trace-opt, starts from zero")

options.AddLongOption('I', "trace-opt-index", "Index of -p query to use --trace-opt, starts from zero")
.RequiredArgument("uint")
.StoreResult(&RunnerOptions.TraceOptScriptId);

options.AddLongOption("trace-id", "Trace id for -p queries")
.RequiredArgument("id")
.EmplaceTo(&ExecutionOptions.TraceIds);
Expand All @@ -566,10 +617,12 @@ class TMain : public TMainClassArgs {
.RequiredArgument("file")
.DefaultValue("-")
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);

options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
.RequiredArgument("uint")
.DefaultValue(0)
.StoreResult(&ExecutionOptions.ResultsRowsLimit);

TChoices<TRunnerOptions::EResultOutputFormat> resultFormat({
{"rows", TRunnerOptions::EResultOutputFormat::RowsJson},
{"full-json", TRunnerOptions::EResultOutputFormat::FullJson},
Expand All @@ -596,6 +649,7 @@ class TMain : public TMainClassArgs {
.Handler1([this](const NLastGetopt::TOptsParser* option) {
RunnerOptions.ScriptQueryPlanOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef())));
});

options.AddLongOption("script-statistics", "File with script inprogress statistics")
.RequiredArgument("file")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
Expand All @@ -605,6 +659,7 @@ class TMain : public TMainClassArgs {
}
RunnerOptions.InProgressStatisticsOutputFiles.emplace_back(file);
});

TChoices<NYdb::NConsoleClient::EDataFormat> planFormat({
{"pretty", NYdb::NConsoleClient::EDataFormat::Pretty},
{"table", NYdb::NConsoleClient::EDataFormat::PrettyTable},
Expand Down Expand Up @@ -641,19 +696,21 @@ class TMain : public TMainClassArgs {
TString choice(option->CurValOrDef());
ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice));
});

options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
.RequiredArgument("uint")
.DefaultValue(0)
.StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit);
TChoices<TAsyncQueriesSettings::EVerbose> verbose({
{"each-query", TAsyncQueriesSettings::EVerbose::EachQuery},
{"final", TAsyncQueriesSettings::EVerbose::Final}
});

options.AddLongOption("verbose", "Common verbose level (max level 2)")
.RequiredArgument("uint")
.DefaultValue(1)
.StoreResult(&RunnerOptions.YdbSettings.VerboseLevel);

TChoices<TAsyncQueriesSettings::EVerbose> verbose({
{"each-query", TAsyncQueriesSettings::EVerbose::EachQuery},
{"final", TAsyncQueriesSettings::EVerbose::Final}
});
options.AddLongOption("async-verbose", "Verbose type for async queries")
.RequiredArgument("type")
.DefaultValue("each-query")
Expand Down Expand Up @@ -690,10 +747,12 @@ class TMain : public TMainClassArgs {
.RequiredArgument("uint")
.DefaultValue(ExecutionOptions.LoopCount)
.StoreResult(&ExecutionOptions.LoopCount);

options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps")
.RequiredArgument("uint")
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails")
.NoArgument()
.SetFlag(&ExecutionOptions.ContinueAfterFail);
Expand Down Expand Up @@ -803,12 +862,19 @@ class TMain : public TMainClassArgs {

RunnerOptions.YdbSettings.YqlToken = YqlToken;
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();

auto& appConfig = RunnerOptions.YdbSettings.AppConfig;
if (ExecutionOptions.ResultsRowsLimit) {
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
appConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
}

if (DefaultLogPriority) {
appConfig.MutableLogConfig()->SetDefaultLevel(*DefaultLogPriority);
}
ModifyLogPriorities(LogPriorities, *appConfig.MutableLogConfig());

if (EmulateYt) {
const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage();
const auto& fileStorageConfig = appConfig.GetQueryServiceConfig().GetFileStorage();
auto fileStorage = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
auto ytFileServices = NYql::NFile::TYtFileServices::Make(RunnerOptions.YdbSettings.FunctionRegistry.Get(), TablesMapping, fileStorage);
RunnerOptions.YdbSettings.YtGateway = NYql::CreateYtFileGateway(ytFileServices);
Expand Down
29 changes: 29 additions & 0 deletions ydb/tests/tools/kqprun/src/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "common.h"


namespace NKqpRun {

NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName) {
NKikimrServices::EServiceKikimr service;
if (!NKikimrServices::EServiceKikimr_Parse(serviceName, &service)) {
ythrow yexception() << "Invalid kikimr service name " << serviceName;
}
return service;
}

void ModifyLogPriorities(std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> logPriorities, NKikimrConfig::TLogConfig& logConfig) {
for (auto& entry : *logConfig.MutableEntry()) {
const auto it = logPriorities.find(GetLogService(entry.GetComponent()));
if (it != logPriorities.end()) {
entry.SetLevel(it->second);
logPriorities.erase(it);
}
}
for (const auto& [service, priority] : logPriorities) {
auto* entry = logConfig.AddEntry();
entry->SetComponent(NKikimrServices::EServiceKikimr_Name(service));
entry->SetLevel(priority);
}
}

} // namespace NKqpRun
15 changes: 11 additions & 4 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#pragma once

#include <ydb/core/protos/config.pb.h>
#include <ydb/public/api/protos/ydb_cms.pb.h>
#include <ydb/core/protos/kqp.pb.h>

#include <ydb/library/actors/core/log_iface.h>
#include <ydb/library/services/services.pb.h>

#include <ydb/public/api/protos/ydb_cms.pb.h>
#include <ydb/public/lib/ydb_cli/common/formats.h>

#include <yql/essentials/minikql/computation/mkql_computation_node.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>

#include <ydb/public/lib/ydb_cli/common/formats.h>
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>


namespace NKqpRun {
Expand Down Expand Up @@ -101,11 +105,14 @@ struct TRequestOptions {
};

template <typename TValue>
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
if (values.empty()) {
return defaultValue;
}
return values[std::min(index, values.size() - 1)];
}

NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName);
void ModifyLogPriorities(std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> logPriorities, NKikimrConfig::TLogConfig& logConfig);

} // namespace NKqpRun
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
actors.cpp
common.cpp
kqp_runner.cpp
ydb_setup.cpp
)
Expand Down
25 changes: 3 additions & 22 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,8 @@ class TYdbSetup::TImpl {
}
}

for (auto setting : Settings_.AppConfig.GetLogConfig().get_arr_entry()) {
NKikimrServices::EServiceKikimr service;
if (!NKikimrServices::EServiceKikimr_Parse(setting.GetComponent(), &service)) {
ythrow yexception() << "Invalid kikimr service name " << setting.GetComponent();
}

runtime.SetLogPriority(service, NActors::NLog::EPriority(setting.GetLevel()));
for (const auto& setting : Settings_.AppConfig.GetLogConfig().get_arr_entry()) {
runtime.SetLogPriority(GetLogService(setting.GetComponent()), NActors::NLog::EPriority(setting.GetLevel()));
}

runtime.SetLogBackendFactory([this]() { return CreateLogBackend(); });
Expand Down Expand Up @@ -316,21 +311,7 @@ class TYdbSetup::TImpl {
return;
}

bool found = false;
for (auto& entry : *Settings_.AppConfig.MutableLogConfig()->MutableEntry()) {
if (entry.GetComponent() == "KQP_YQL") {
entry.SetLevel(NActors::NLog::PRI_TRACE);
found = true;
break;
}
}

if (!found) {
auto entry = Settings_.AppConfig.MutableLogConfig()->AddEntry();
entry->SetComponent("KQP_YQL");
entry->SetLevel(NActors::NLog::PRI_TRACE);
}

ModifyLogPriorities({{NKikimrServices::EServiceKikimr::KQP_YQL, NActors::NLog::PRI_TRACE}}, *Settings_.AppConfig.MutableLogConfig());
NYql::NLog::InitLogger(NActors::CreateNullBackend());
}

Expand Down

0 comments on commit aa4e3d5

Please sign in to comment.