Skip to content

Commit

Permalink
tx interactions manager (#8050)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikita Vasilev <ns-vasilev@ydb.tech>
  • Loading branch information
ivanmorozov333 and nikvas0 authored Sep 2, 2024
1 parent 945b20f commit 1af55df
Show file tree
Hide file tree
Showing 113 changed files with 3,790 additions and 1,054 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ namespace NKikimr::NKqp {
using namespace NYql::NDq;
using namespace NYql::NDqProto;

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
Expand All @@ -141,7 +141,7 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lo

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);

NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
TIntrusivePtr<TKqpCounters> counters,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct IKqpNodeComputeActorFactory {
struct TCreateArgs {
const NActors::TActorId& ExecuterId;
const ui64 TxId;
const ui64 LockTxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
NYql::NDqProto::TDqTask* Task;
TIntrusivePtr<NRm::TTxState> TxInfo;
Expand Down
290 changes: 150 additions & 140 deletions ydb/core/kqp/compute_actor/kqp_compute_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,165 +16,176 @@ struct TLocksInfo {
TVector<NKikimrDataEvents::TLock> BrokenLocks;
};

struct TEvKqpCompute {
struct TEvRemoteScanData : public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData,
TKqpComputeEvents::EvRemoteScanData> {};
namespace NInternalImplementation {
struct TEvRemoteScanData: public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData,
TKqpComputeEvents::EvRemoteScanData> {
};

class IShardScanStats {
public:
virtual ~IShardScanStats() = default;
virtual THashMap<TString, ui64> GetMetrics() const = 0;
};
class IShardScanStats {
public:
virtual ~IShardScanStats() = default;
virtual THashMap<TString, ui64> GetMetrics() const = 0;
};

/*
* Scan communications.
*
* TEvScanData is intentionally preserved as a local event for performance reasons: leaf compute
* actors are communicating with shard scans using this message, so big amount of unfiltered data
* is expected. However, it is possible that after query planning datashard would migrate to other
* node. To support scans in this case we provide serialization routines. For now such remote scan
* is considered as rare event and not worth of some fast serialization, so we just use protobuf.
*
* TEvScanDataAck follows the same pattern mostly for symmetry reasons.
*/
struct TEvScanData : public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::EvScanData> {
TEvScanData(const ui32 scanId, const ui32 generation = 0)
: ScanId(scanId)
, Generation(generation)
, Finished(false) {}

std::optional<ui32> AvailablePacks;
ui32 ScanId;
ui32 Generation;
TVector<TOwnedCellVec> Rows;
std::shared_ptr<arrow::Table> ArrowBatch;
std::vector<std::vector<ui32>> SplittedBatches;

TOwnedCellVec LastKey;
TDuration CpuTime;
TDuration WaitTime;
ui32 PageFaults = 0; // number of page faults occurred when filling in this message
bool RequestedBytesLimitReached = false;
bool Finished = false;
bool PageFault = false; // page fault was the reason for sending this message
mutable THolder<TEvRemoteScanData> Remote;
std::shared_ptr<IShardScanStats> StatsOnFinished;
TLocksInfo LocksInfo;

template <class T>
const T& GetStatsAs() const {
Y_ABORT_UNLESS(!!StatsOnFinished);
return VerifyDynamicCast<const T&>(*StatsOnFinished);
/*
* Scan communications.
*
* TEvScanData is intentionally preserved as a local event for performance reasons: leaf compute
* actors are communicating with shard scans using this message, so big amount of unfiltered data
* is expected. However, it is possible that after query planning datashard would migrate to other
* node. To support scans in this case we provide serialization routines. For now such remote scan
* is considered as rare event and not worth of some fast serialization, so we just use protobuf.
*
* TEvScanDataAck follows the same pattern mostly for symmetry reasons.
*/
struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::EvScanData> {
TEvScanData(const ui32 scanId, const ui32 generation = 0)
: ScanId(scanId)
, Generation(generation)
, Finished(false) {
}

std::optional<ui32> AvailablePacks;
ui32 ScanId;
ui32 Generation;
TVector<TOwnedCellVec> Rows;
std::shared_ptr<arrow::Table> ArrowBatch;
std::vector<std::vector<ui32>> SplittedBatches;

TOwnedCellVec LastKey;
TDuration CpuTime;
TDuration WaitTime;
ui32 PageFaults = 0; // number of page faults occurred when filling in this message
bool RequestedBytesLimitReached = false;
bool Finished = false;
bool PageFault = false; // page fault was the reason for sending this message
mutable THolder<TEvRemoteScanData> Remote;
std::shared_ptr<IShardScanStats> StatsOnFinished;
TLocksInfo LocksInfo;

template <class T>
const T& GetStatsAs() const {
Y_ABORT_UNLESS(!!StatsOnFinished);
return VerifyDynamicCast<const T&>(*StatsOnFinished);
}

template <class T>
bool CheckStatsIs() const {
auto p = dynamic_cast<const T*>(StatsOnFinished.get());
return p;
}

ui32 GetRowsCount() const {
if (ArrowBatch) {
return ArrowBatch->num_rows();
} else {
return Rows.size();
}
}

template <class T>
bool CheckStatsIs() const {
auto p = dynamic_cast<const T*>(StatsOnFinished.get());
return p;
}
bool IsEmpty() const {
return GetRowsCount() == 0;
}

ui32 GetRowsCount() const {
if (ArrowBatch) {
return ArrowBatch->num_rows();
} else {
return Rows.size();
}
}
bool IsSerializable() const override {
return true;
}

bool IsEmpty() const {
return GetRowsCount() == 0;
}
ui32 CalculateSerializedSize() const override {
InitRemote();
return Remote->CalculateSerializedSizeCached();
}

bool IsSerializable() const override {
return true;
}
bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override {
InitRemote();
return Remote->SerializeToArcadiaStream(chunker);
}

ui32 CalculateSerializedSize() const override {
InitRemote();
return Remote->CalculateSerializedSizeCached();
NKikimrDataEvents::EDataFormat GetDataFormat() const {
if (ArrowBatch != nullptr || SplittedBatches.size()) {
return NKikimrDataEvents::FORMAT_ARROW;
}

bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override {
InitRemote();
return Remote->SerializeToArcadiaStream(chunker);
return NKikimrDataEvents::FORMAT_CELLVEC;
}


static NActors::IEventBase* Load(TEventSerializedData* data) {
auto pbEv = THolder<TEvRemoteScanData>(static_cast<TEvRemoteScanData*>(TEvRemoteScanData::Load(data)));
auto ev = MakeHolder<TEvScanData>(pbEv->Record.GetScanId(), pbEv->Record.GetGeneration());

ev->CpuTime = TDuration::MicroSeconds(pbEv->Record.GetCpuTimeUs());
ev->WaitTime = TDuration::MilliSeconds(pbEv->Record.GetWaitTimeMs());
ev->PageFault = pbEv->Record.GetPageFault();
ev->PageFaults = pbEv->Record.GetPageFaults();
ev->Finished = pbEv->Record.GetFinished();
ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
if (pbEv->Record.HasAvailablePacks()) {
ev->AvailablePacks = pbEv->Record.GetAvailablePacks();
}

NKikimrDataEvents::EDataFormat GetDataFormat() const {
if (ArrowBatch != nullptr || SplittedBatches.size()) {
return NKikimrDataEvents::FORMAT_ARROW;
}
return NKikimrDataEvents::FORMAT_CELLVEC;
auto rows = pbEv->Record.GetRows();
ev->Rows.reserve(rows.size());
for (const auto& row : rows) {
ev->Rows.emplace_back(TSerializedCellVec(row).GetCells());
}


static NActors::IEventBase* Load(TEventSerializedData* data) {
auto pbEv = THolder<TEvRemoteScanData>(static_cast<TEvRemoteScanData *>(TEvRemoteScanData::Load(data)));
auto ev = MakeHolder<TEvScanData>(pbEv->Record.GetScanId(), pbEv->Record.GetGeneration());

ev->CpuTime = TDuration::MicroSeconds(pbEv->Record.GetCpuTimeUs());
ev->WaitTime = TDuration::MilliSeconds(pbEv->Record.GetWaitTimeMs());
ev->PageFault = pbEv->Record.GetPageFault();
ev->PageFaults = pbEv->Record.GetPageFaults();
ev->Finished = pbEv->Record.GetFinished();
ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
if (pbEv->Record.HasAvailablePacks()) {
ev->AvailablePacks = pbEv->Record.GetAvailablePacks();
}

auto rows = pbEv->Record.GetRows();
ev->Rows.reserve(rows.size());
for (const auto& row: rows) {
ev->Rows.emplace_back(TSerializedCellVec(row).GetCells());
if (pbEv->Record.HasArrowBatch()) {
auto batch = pbEv->Record.GetArrowBatch();
auto schema = NArrow::DeserializeSchema(batch.GetSchema());
ev->ArrowBatch = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({ NArrow::DeserializeBatch(batch.GetBatch(), schema) }));
}
return ev.Release();
}

private:
void InitRemote() const {
if (!Remote) {
Remote = MakeHolder<TEvRemoteScanData>();

Remote->Record.SetScanId(ScanId);
Remote->Record.SetGeneration(Generation);
Remote->Record.SetCpuTimeUs(CpuTime.MicroSeconds());
Remote->Record.SetWaitTimeMs(WaitTime.MilliSeconds());
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetFinished(Finished);
Remote->Record.SetRequestedBytesLimitReached(RequestedBytesLimitReached);
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetPageFault(PageFault);
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
if (AvailablePacks) {
Remote->Record.SetAvailablePacks(*AvailablePacks);
}

if (pbEv->Record.HasArrowBatch()) {
auto batch = pbEv->Record.GetArrowBatch();
auto schema = NArrow::DeserializeSchema(batch.GetSchema());
ev->ArrowBatch = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({NArrow::DeserializeBatch(batch.GetBatch(), schema)}));
switch (GetDataFormat()) {
case NKikimrDataEvents::FORMAT_UNSPECIFIED:
case NKikimrDataEvents::FORMAT_CELLVEC: {
Remote->Record.MutableRows()->Reserve(Rows.size());
for (const auto& row : Rows) {
Remote->Record.AddRows(TSerializedCellVec::Serialize(row));
}
break;
}
case NKikimrDataEvents::FORMAT_ARROW: {
Y_DEBUG_ABORT_UNLESS(ArrowBatch != nullptr);
auto* protoArrowBatch = Remote->Record.MutableArrowBatch();
protoArrowBatch->SetSchema(NArrow::SerializeSchema(*ArrowBatch->schema()));
protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch, true)));
break;
}
}
return ev.Release();
}
}
};

private:
void InitRemote() const {
if (!Remote) {
Remote = MakeHolder<TEvRemoteScanData>();
}

Remote->Record.SetScanId(ScanId);
Remote->Record.SetGeneration(Generation);
Remote->Record.SetCpuTimeUs(CpuTime.MicroSeconds());
Remote->Record.SetWaitTimeMs(WaitTime.MilliSeconds());
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetFinished(Finished);
Remote->Record.SetRequestedBytesLimitReached(RequestedBytesLimitReached);
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetPageFault(PageFault);
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
if (AvailablePacks) {
Remote->Record.SetAvailablePacks(*AvailablePacks);
}
struct TEvKqpCompute {
using TEvRemoteScanData = NInternalImplementation::TEvRemoteScanData;

switch (GetDataFormat()) {
case NKikimrDataEvents::FORMAT_UNSPECIFIED:
case NKikimrDataEvents::FORMAT_CELLVEC: {
Remote->Record.MutableRows()->Reserve(Rows.size());
for (const auto& row: Rows) {
Remote->Record.AddRows(TSerializedCellVec::Serialize(row));
}
break;
}
case NKikimrDataEvents::FORMAT_ARROW: {
Y_DEBUG_ABORT_UNLESS(ArrowBatch != nullptr);
auto* protoArrowBatch = Remote->Record.MutableArrowBatch();
protoArrowBatch->SetSchema(NArrow::SerializeSchema(*ArrowBatch->schema()));
protoArrowBatch->SetBatch(NArrow::SerializeBatchNoCompression(NArrow::ToBatch(ArrowBatch, true)));
break;
}
}
}
}
};
using IShardScanStats = NInternalImplementation::IShardScanStats;

using TEvScanData = NInternalImplementation::TEvScanData;

struct TEvRemoteScanDataAck: public NActors::TEventPB<TEvRemoteScanDataAck, NKikimrKqp::TEvRemoteScanDataAck,
TKqpComputeEvents::EvRemoteScanDataAck> {
Expand Down Expand Up @@ -253,5 +264,4 @@ struct TEvKqpCompute {
TKqpComputeEvents::EvKillScanTablet> {};

};

} // namespace NKikimr::NKqp
6 changes: 3 additions & 3 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);

} // anonymous namespace

TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
Expand Down Expand Up @@ -149,14 +149,14 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvSendData::TPtr& ev) {
for (const auto& lock : msg.GetLocksInfo().Locks) {
Locks.insert(lock);
}
for (const auto& lock : msg.GetLocksInfo().Locks) {
for (const auto& lock : msg.GetLocksInfo().BrokenLocks) {
BrokenLocks.insert(lock);
}

auto guard = TaskRunner->BindAllocator();
if (!!msg.GetArrowBatch()) {
ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.GetArrowBatch(), std::move(msg.MutableDataIndexes())), msg.GetTabletId(), TaskRunner->GetHolderFactory());
} else {
} else if (!msg.GetRows().empty()) {
ScanData->AddData(std::move(msg.MutableRows()), msg.GetTabletId(), TaskRunner->GetHolderFactory());
}
if (IsQuotingEnabled()) {
Expand Down
Loading

0 comments on commit 1af55df

Please sign in to comment.