diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 537ad9128897..89c34e6cebbe 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -101,6 +101,7 @@ class TWaitEraseTablesTxSubscriber; class TTxBlobsWritingFinished; class TTxBlobsWritingFailed; class TWriteTasksQueue; +class TWriteTask; namespace NLoading { class TInsertTableInitializer; @@ -230,6 +231,8 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa friend class NLoading::TInFlightReadsInitializer; friend class NLoading::TSpecialValuesInitializer; friend class NLoading::TTablesManagerInitializer; + friend class TWriteTasksQueue; + friend class TWriteTask; class TTxProgressTx; class TTxProposeCancel; diff --git a/ydb/core/tx/columnshard/tablet/write_queue.cpp b/ydb/core/tx/columnshard/tablet/write_queue.cpp index c8e36146f06e..3e8e1a22cea4 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.cpp +++ b/ydb/core/tx/columnshard/tablet/write_queue.cpp @@ -1,12 +1,14 @@ #include "write_queue.h" #include +#include +#include namespace NKikimr::NColumnShard { bool TWriteTask::CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx) { auto overloadStatus = owner->CheckOverloadedImmediate(PathId); - if (overloadStatus == EOverloadStatus::None) { + if (overloadStatus == TColumnShard::EOverloadStatus::None) { return false; } owner->Counters.GetWritesMonitor()->OnFinishWrite(ArrowData->GetSize()); @@ -19,12 +21,12 @@ bool TWriteTask::CheckOverloadImmediate(TColumnShard* owner, const TActorContext } bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { - overloadStatus = owner->CheckOverloadedWait(PathId); - if (overloadStatus == EOverloadStatus::OverloadMetadata) { + auto overloadStatus = owner->CheckOverloadedWait(PathId); + if (overloadStatus == TColumnShard::EOverloadStatus::OverloadMetadata) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); return false; } - AFL_VERIFY(overloadStatus == EOverloadStatus::None); + AFL_VERIFY(overloadStatus == TColumnShard::EOverloadStatus::None); owner->OperationsManager->RegisterLock(LockId, owner->Generation()); auto writeOperation = owner->OperationsManager->RegisterOperation( @@ -44,6 +46,10 @@ bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { return true; } +ui64 TWriteTask::GetSize() const { + return ArrowData->GetSize(); +} + bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) { if (onWakeup) { WriteTasksOverloadCheckerScheduled = false; diff --git a/ydb/core/tx/columnshard/tablet/write_queue.h b/ydb/core/tx/columnshard/tablet/write_queue.h index 81b94766114b..682cdbf7e823 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.h +++ b/ydb/core/tx/columnshard/tablet/write_queue.h @@ -2,10 +2,10 @@ #include #include #include -#include namespace NKikimr::NColumnShard { class TColumnShard; +class TArrowData; class TWriteTask: TMoveOnly { private: std::shared_ptr ArrowData; @@ -34,10 +34,13 @@ class TWriteTask: TMoveOnly { , Behaviour(behaviour) { } + ui64 GetSize() const; + const TMonotonic& GetCreatedMonotonic() const { return Created; } + bool CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx); bool Execute(TColumnShard* owner, const TActorContext& ctx); };