Skip to content

Commit

Permalink
Merge fa2e3d6 into 20d9d44
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored Jan 11, 2025
2 parents 20d9d44 + fa2e3d6 commit c991ba0
Show file tree
Hide file tree
Showing 49 changed files with 4,096 additions and 659 deletions.
20 changes: 20 additions & 0 deletions ydb/core/kqp/common/buffer/buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/kqp/common/kqp_tx_manager.h>

namespace NKikimr {
namespace NKqp {

struct TKqpBufferWriterSettings {
TActorId SessionActorId;
IKqpTransactionManagerPtr TxManager;
NWilson::TTraceId TraceId;
TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
};

NActors::IActor* CreateKqpBufferWriterActor(TKqpBufferWriterSettings&& settings);

}
}
16 changes: 16 additions & 0 deletions ydb/core/kqp/common/buffer/events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "events.h"

namespace NKikimr {
namespace NKqp {

TEvKqpBuffer::TEvError::TEvError(
const TString& message,
NYql::NDqProto::StatusIds::StatusCode statusCode,
const NYql::TIssues& subIssues)
: Message(message)
, StatusCode(statusCode)
, SubIssues(subIssues) {
}

}
}
52 changes: 52 additions & 0 deletions ydb/core/kqp/common/buffer/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>


namespace NKikimr {
namespace NKqp {

struct TEvKqpBuffer {

struct TEvPrepare : public TEventLocal<TEvPrepare, TKqpBufferWriterEvents::EvPrepare> {
TActorId ExecuterActorId;
};

struct TEvCommit : public TEventLocal<TEvCommit, TKqpBufferWriterEvents::EvCommit> {
TActorId ExecuterActorId;
ui64 TxId;
};

struct TEvRollback : public TEventLocal<TEvRollback, TKqpBufferWriterEvents::EvRollback> {
TActorId ExecuterActorId;
};

struct TEvFlush : public TEventLocal<TEvFlush, TKqpBufferWriterEvents::EvFlush> {
TActorId ExecuterActorId;
};

struct TEvResult : public TEventLocal<TEvResult, TKqpBufferWriterEvents::EvResult> {
TEvResult() = default;
TEvResult(NYql::NDqProto::TDqTaskStats&& stats) : Stats(std::move(stats)) {}

std::optional<NYql::NDqProto::TDqTaskStats> Stats;
};

struct TEvError : public TEventLocal<TEvError, TKqpBufferWriterEvents::EvError> {
TString Message;
NYql::NDqProto::StatusIds::StatusCode StatusCode;
NYql::TIssues SubIssues;

TEvError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues);
};

struct TEvTerminate : public TEventLocal<TEvTerminate, TKqpBufferWriterEvents::EvTerminate> {
};

};

}
}
14 changes: 14 additions & 0 deletions ydb/core/kqp/common/buffer/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
LIBRARY()

SRCS(
events.cpp
)

PEERDIR(
ydb/core/kqp/common/simple
ydb/library/yql/public/issue
)

YQL_LAST_ABI_VERSION()

END()
36 changes: 5 additions & 31 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig

size_t readPhases = 0;
bool hasEffects = false;
bool hasSourceRead = false;
bool hasStreamLookup = false;
bool hasSinkWrite = false;

Expand All @@ -191,7 +190,6 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
}

for (const auto &stage : tx.GetStages()) {
hasSourceRead |= !stage.GetSources().empty();
hasSinkWrite |= !stage.GetSinks().empty();

for (const auto &input : stage.GetInputs()) {
Expand All @@ -211,9 +209,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return true;
}

if ((hasSourceRead || hasStreamLookup) && hasSinkWrite) {
return true;
}
YQL_ENSURE(!hasSinkWrite || hasEffects);

// We don't want snapshot when there are effects at the moment,
// because it hurts performance when there are multiple single-shard
Expand Down Expand Up @@ -251,23 +247,12 @@ bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables) {
bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage) {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");

const bool isOlapSink = std::any_of(
std::begin(tables),
std::end(tables),
[&](const NKqpProto::TKqpPhyTable& table) {
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_OLAP
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
});

if (isOlapSink) {
return true;
}
return settings.GetIsOlap();
}
}
return false;
Expand All @@ -276,7 +261,7 @@ bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
if (HasOlapTableWriteInStage(stage, tx.GetTables())) {
if (HasOlapTableWriteInStage(stage)) {
return true;
}
}
Expand Down Expand Up @@ -325,18 +310,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");

const bool isOltpSink = std::any_of(
std::begin(tx.GetTables()),
std::end(tx.GetTables()),
[&](const NKqpProto::TKqpPhyTable& table) {
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_DS
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
});

if (isOltpSink) {
return true;
}
return !settings.GetIsOlap();
}
}
}
Expand Down
16 changes: 7 additions & 9 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/base/feature_flags.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/kqp_tx_manager.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>

Expand Down Expand Up @@ -121,12 +122,6 @@ struct TDeferredEffects {
friend class TKqpTransactionContext;
};

struct TTableInfo {
bool IsOlap = false;
THashSet<TStringBuf> Pathes;
};


class TShardIdToTableInfo {
public:
const TTableInfo& Get(ui64 shardId) const {
Expand Down Expand Up @@ -204,6 +199,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
void Finish() final {
YQL_ENSURE(DeferredEffects.Empty());
YQL_ENSURE(!Locks.HasLocks());
YQL_ENSURE(!TxManager);
YQL_ENSURE(!BufferActorId);

FinishTime = TInstant::Now();

Expand Down Expand Up @@ -350,6 +347,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;

TActorId BufferActorId;
IKqpTransactionManagerPtr TxManager = nullptr;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};

Expand Down Expand Up @@ -507,9 +507,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOlapTableWriteInStage(
const NKqpProto::TKqpPhyStage& stage,
const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables);
bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage);
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
Expand Down
Loading

0 comments on commit c991ba0

Please sign in to comment.