Skip to content

Commit

Permalink
YQL-17087: Enable output channel spilling only if task has two or mor…
Browse files Browse the repository at this point in the history
…e outputs (#1043)

* Enable output channel spilling only if task has two or more outputs
* Add handling if spilling service is not started.
TDqLocalFileSpillingActor sends events with flag to track undelivery.
Add unit tests for this.
Fix LOG macroses in channel_storage_actor.cpp to handle ActorSystem pointer also.
  • Loading branch information
Darych authored Jan 29, 2024
1 parent 93067e7 commit 11534d6
Show file tree
Hide file tree
Showing 36 changed files with 226 additions and 141 deletions.
10 changes: 9 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ using namespace NYql::NDq;
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp))
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
, WithSpilling_(withSpilling)
{
}

Expand All @@ -26,6 +27,13 @@ class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
{
return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override {
return TDqTaskRunnerExecutionContext::CreateChannelStorage(channelId, WithSpilling_ || withSpilling);
}

private:
bool WithSpilling_;
};

} // namespace NKqp
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ void TKqpComputeActor::DoBootstrap() {

auto wakeup = [this]{ ContinueExecute(); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling,
std::move(wakeup)));
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto& record = channelsInfoEv->Record;

for (auto& channelId : channelIds) {
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion);
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false);
}

LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;

TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
Expand Down Expand Up @@ -289,7 +289,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);

THashMap<ui64, ui64> alreadyAssigned;
for(auto& [nodeId, tasks] : TasksPerNode) {
for(auto& [nodeId, tasks] : TasksPerNode) {
for(ui64 taskId: tasks) {
alreadyAssigned.emplace(taskId, nodeId);
}
Expand Down Expand Up @@ -366,7 +366,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
task.ComputeActorId = computeActorId;

LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);

auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
YQL_ENSURE(result.second);
}
Expand Down Expand Up @@ -409,7 +409,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
}
ComputeTasks.clear();
}

if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && AllowSinglePartitionOpt) {
// query affects a single key or shard, so it might be more effective
// to execute this task locally so we can avoid useless overhead for remote task launching.
Expand Down Expand Up @@ -518,8 +518,8 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization,
const TIntrusivePtr<TUserRequestContext>& userRequestContext)
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class TKqpPlanner {
public:
TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool allowSinglePartitionOpt,
const TIntrusivePtr<TUserRequestContext>& userRequestContext);
Expand All @@ -63,7 +63,7 @@ class TKqpPlanner {
ui32 GetnComputeTasks();

private:

const IKqpGateway::TKqpSnapshot& GetSnapshot() const;
void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool optimizeProtoForLocalExecution);
void PrepareToProcess();
Expand Down Expand Up @@ -114,8 +114,8 @@ class TKqpPlanner {
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig,
bool useDataQueryPool, bool localComputeTasks,
Expand Down
15 changes: 10 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,12 +762,13 @@ void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) {
}

void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NDqProto::TChannel& channelDesc, const TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion) {
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling) {
channelDesc.SetId(channel.Id);
channelDesc.SetSrcStageId(channel.SrcStageId.StageId);
channelDesc.SetDstStageId(channel.DstStageId.StageId);
channelDesc.SetSrcTaskId(channel.SrcTask);
channelDesc.SetDstTaskId(channel.DstTask);
channelDesc.SetEnableSpilling(enableSpilling);

const auto& resultChannelProxies = tasksGraph.GetMeta().ResultChannelProxies;

Expand Down Expand Up @@ -980,7 +981,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
}
}

void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, bool enableSpilling) {
switch (output.Type) {
case TTaskOutputType::Map:
YQL_ENSURE(output.Channels.size() == 1);
Expand Down Expand Up @@ -1040,7 +1041,7 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutpu

for (auto& channel : output.Channels) {
auto& channelDesc = *outputDesc.AddChannels();
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion);
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion, enableSpilling);
}
}

Expand Down Expand Up @@ -1092,7 +1093,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&

for (ui64 channel : input.Channels) {
auto& channelDesc = *inputDesc.AddChannels();
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion);
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion, false);
}

if (input.Transform) {
Expand Down Expand Up @@ -1141,8 +1142,12 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N
FillInputDesc(tasksGraph, *result->AddInputs(), input, serializeAsyncIoSettings);
}

bool enableSpilling = false;
if (task.Outputs.size() > 1) {
enableSpilling = AppData()->EnableKqpSpilling;
}
for (const auto& output : task.Outputs) {
FillOutputDesc(tasksGraph, *result->AddOutputs(), output);
FillOutputDesc(tasksGraph, *result->AddOutputs(), output, enableSpilling);
}

const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageIn
NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task, bool serializeAsyncIoSettings);
void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, NYql::NDqProto::TDqTask* message, bool serializeAsyncIoSettings);
void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc,
const NYql::NDq::TChannel& channel, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling);

template<typename Proto>
TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, TIntrusiveConstPtr<TTableConstInfo> tableInfo) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1105,11 +1105,11 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override {
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */) const override {
return {};
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
return {};
}
};
Expand Down
3 changes: 1 addition & 2 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);

auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(
TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup));

Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TDqComputeActor : public TDqSyncComputeActorBase<TDqComputeActor> {
auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
PrepareTaskRunner(execCtx);

ContinueExecute(EResumeSource::CABootstrap);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct TEvDqCompute {
struct TEvStateRequest : public NActors::TEventPB<TEvStateRequest, NDqProto::TEvComputeStateRequest, TDqComputeEvents::EvStateRequest> {};

struct TEvResumeExecution : public NActors::TEventLocal<TEvResumeExecution, TDqComputeEvents::EvResumeExecution> {
TEvResumeExecution(EResumeSource source)
TEvResumeExecution(EResumeSource source)
: Source(source)
{ }

Expand Down
11 changes: 5 additions & 6 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
namespace NYql {
namespace NDq {

TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TxId_(txId)
, WakeUp_(std::move(wakeUp))
, WithSpilling_(withSpilling)
{
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
return CreateChannelStorage(channelId, nullptr, false);
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling) const {
return CreateChannelStorage(channelId, withSpilling, nullptr, false);
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
if (WithSpilling_) {
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
if (withSpilling) {
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent);
} else {
return nullptr;
Expand Down
7 changes: 3 additions & 4 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ namespace NDq {

class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
public:
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);
TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp);

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;

private:
const TTxId TxId_;
const IDqChannelStorage::TWakeUpCallback WakeUp_;
const bool WithSpilling_;
};

} // namespace NDq
Expand Down
59 changes: 47 additions & 12 deletions ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,53 @@ using namespace NActors;

namespace {

#define LOG_D(s) \
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
#define LOG_I(s) \
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
#define LOG_E(s) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
#define LOG_C(s) \
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
#define LOG_W(s) \
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
#define LOG_T(s) \
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
#define LOG_D(s) { \
if (ActorSystem_) { \
LOG_DEBUG_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} else { \
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} \
}

#define LOG_I(s) { \
if (ActorSystem_) { \
LOG_INFO_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} else { \
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} \
}

#define LOG_E(s) { \
if (ActorSystem_) { \
LOG_ERROR_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} else { \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} \
}

#define LOG_C(s) { \
if (ActorSystem_) { \
LOG_CRIT_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} else { \
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} \
}

#define LOG_W(s) { \
if (ActorSystem_) { \
LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} else { \
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} \
}

#define LOG_T(s) { \
if (ActorSystem_) { \
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} else { \
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} \
}

constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
Expand Down
Loading

0 comments on commit 11534d6

Please sign in to comment.