Skip to content

Commit

Permalink
signals, optimizations, logging, inserted data expiration (#9200)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 16, 2024
1 parent 8761c91 commit 065294c
Show file tree
Hide file tree
Showing 23 changed files with 195 additions and 149 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot());
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NColumnShard {

TCSCounters::TCSCounters()
: TBase("CS")
, WritingCounters(std::make_shared<TWriteCounters>(*this))
, Initialization(*this)
, TxProgress(*this) {
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");
Expand Down
31 changes: 25 additions & 6 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@ enum class EWriteFailReason {
OverlimitReadBlobMemory /* "overlimit_read_blob_memory" */
};

class TWriteCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;

public:
TWriteCounters(TCommonCountersOwner& owner)
: TBase(owner, "activity", "writing")
{
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
}

void OnIncomingData(const ui64 dataSize) const {
VolumeWriteData->Add(dataSize);
HistogramBytesWriteDataCount->Collect((i64)dataSize, 1);
HistogramBytesWriteDataBytes->Collect((i64)dataSize, dataSize);
}
};

class TCSCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
Expand Down Expand Up @@ -72,7 +95,9 @@ class TCSCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;

public:
const std::shared_ptr<TWriteCounters> WritingCounters;
const TCSInitialization Initialization;
TTxProgressCounters TxProgress;

Expand All @@ -89,7 +114,6 @@ class TCSCounters: public TCommonCountersOwner {

void OnWritePutBlobsSuccess(const TDuration d) const {
HistogramSuccessWritePutBlobsDurationMs->Collect(d.MilliSeconds());
WritePutBlobsCount->Sub(1);
}

void OnWriteMiddle1PutBlobsSuccess(const TDuration d) const {
Expand Down Expand Up @@ -118,11 +142,6 @@ class TCSCounters: public TCommonCountersOwner {

void OnWritePutBlobsFail(const TDuration d) const {
HistogramFailedWritePutBlobsDurationMs->Collect(d.MilliSeconds());
WritePutBlobsCount->Sub(1);
}

void OnWritePutBlobsStart() const {
WritePutBlobsCount->Add(1);
}

void OnWriteTxComplete(const TDuration d) const {
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
limitExceeded = true;
break;
}
const auto inserted = uniquePortions.emplace(info->GetAddress()).second;
Y_ABORT_UNLESS(inserted);
AFL_VERIFY(uniquePortions.emplace(info->GetAddress()).second);
changes->PortionsToDrop.push_back(*info);
++portionsFromDrop;
}
Expand All @@ -381,8 +380,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
++i;
continue;
}
const auto inserted = uniquePortions.emplace(it->second[i].GetAddress()).second;
if (inserted) {
if (uniquePortions.emplace(it->second[i].GetAddress()).second) {
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i].GetTxVolume();
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
}

std::vector<TCommittedBlob> result;
result.reserve(pInfo->GetCommitted().size() + pInfo->GetInserted().size());
result.reserve(pInfo->GetCommitted().size() + Summary.GetInserted().size());

for (const auto& data : pInfo->GetCommitted()) {
if (lockId || data.GetSnapshot() <= reqSnapshot) {
Expand All @@ -137,7 +137,10 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
}
}
if (lockId) {
for (const auto& [writeId, data] : pInfo->GetInserted()) {
for (const auto& [writeId, data] : Summary.GetInserted()) {
if (data.GetPathId() != pathId) {
continue;
}
auto start = data.GetMeta().GetFirstPK(pkSchema);
auto finish = data.GetMeta().GetLastPK(pkSchema);
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ namespace NKikimr::NOlap {
class TPKRangesFilter;
class IDbWrapper;

/// Use one table for inserted and committed blobs:
/// !Commited => {PlanStep, WriteTxId} are {0, WriteId}
/// Commited => {PlanStep, WriteTxId} are {PlanStep, TxId}

class TInsertTableAccessor {
protected:
TInsertionSummary Summary;
Expand Down Expand Up @@ -76,7 +72,7 @@ class TInsertTableAccessor {
const THashMap<TInsertWriteId, TInsertedData>& GetAborted() const {
return Summary.GetAborted();
}
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const {
const TInsertedContainer& GetInserted() const {
return Summary.GetInserted();
}
const TInsertionSummary::TCounters& GetCountersPrepared() const {
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,4 @@ NKikimr::NOlap::TPathInfoIndexPriority TPathInfo::GetIndexationPriority() const
}
}

const THashMap<TInsertWriteId, TInsertedData>& TPathInfo::GetInserted() const {
return Summary->GetInserted();
}

}
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/engines/insert_table/path_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class TPathInfo: public TMoveOnly {
return Committed.empty() && !InsertedSize;
}

const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const;

void AddInsertedSize(const i64 size, const ui64 overloadLimit);

explicit TPathInfo(TInsertionSummary& summary, const ui64 pathId);
Expand Down
61 changes: 10 additions & 51 deletions ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,8 @@ void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize
AFL_VERIFY(Counters.Inserted.GetDataSize() == (i64)StatsPrepared.Bytes);
}

THashSet<TInsertWriteId> TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const {
THashSet<TInsertWriteId> result;
for (auto& [writeId, data] : Inserted) {
if (data.GetPathId() == pathId) {
result.insert(writeId);
}
}

return result;
}

THashSet<TInsertWriteId> TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const {
if (timeBorder < MinInsertedTs) {
return {};
}

THashSet<TInsertWriteId> toAbort;
TInstant newMin = TInstant::Max();
for (auto& [writeId, data] : Inserted) {
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
if (data.IsNotAbortable()) {
continue;
}
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
toAbort.insert(writeId);
} else {
newMin = Min(newMin, dataInsertTs);
}
}
MinInsertedTs = (toAbort.size() == Inserted.size()) ? TInstant::Zero() : newMin;
return toAbort;
return Inserted.GetExpired(timeBorder, limit);
}

bool TInsertionSummary::EraseAborted(const TInsertWriteId writeId) {
Expand Down Expand Up @@ -173,33 +144,21 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData
}

std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TInsertWriteId id) {
auto it = Inserted.find(id);
if (it == Inserted.end()) {
return {};
} else {
auto pathInfo = GetPathInfoOptional(it->second.GetPathId());
auto result = Inserted.ExtractOptional(id);
if (result) {
auto pathInfo = GetPathInfoOptional(result->GetPathId());
if (pathInfo) {
OnEraseInserted(*pathInfo, it->second.BlobSize());
OnEraseInserted(*pathInfo, result->BlobSize());
}
std::optional<TInsertedData> result = std::move(it->second);
Inserted.erase(it);
return result;
}
return result;
}

const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertWriteId writeId = data.GetInsertWriteId();
const ui32 dataSize = data.BlobSize();
const ui64 pathId = data.GetPathId();
auto insertInfo = Inserted.emplace(writeId, std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
if (insertInfo.second) {
OnNewInserted(GetPathInfo(pathId), dataSize, load);
return &insertInfo.first->second;
} else {
Counters.Inserted.SkipAdd(dataSize);
return nullptr;
}
auto* insertInfo = Inserted.AddVerified(std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(insertInfo->GetInsertWriteId()));
OnNewInserted(GetPathInfo(insertInfo->GetPathId()), insertInfo->BlobSize(), load);
return insertInfo;
}

}
121 changes: 113 additions & 8 deletions ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,114 @@

namespace NKikimr::NOlap {
class IBlobsDeclareRemovingAction;

class TInsertedDataInstant {
private:
const TInsertedData* Data;
const TInstant WriteTime;

public:
TInsertedDataInstant(const TInsertedData& data)
: Data(&data)
, WriteTime(Data->GetMeta().GetDirtyWriteTime())
{

}

const TInsertedData& GetData() const {
return *Data;
}
TInstant GetWriteTime() const {
return WriteTime;
}

bool operator<(const TInsertedDataInstant& item) const {
if (WriteTime == item.WriteTime) {
return Data->GetInsertWriteId() < item.Data->GetInsertWriteId();
} else {
return WriteTime < item.WriteTime;
}
}
};

class TInsertedContainer {
private:
THashMap<TInsertWriteId, TInsertedData> Inserted;
std::set<TInsertedDataInstant> InsertedByWriteTime;

public:
size_t size() const {
return Inserted.size();
}

bool contains(const TInsertWriteId id) const {
return Inserted.contains(id);
}

THashMap<TInsertWriteId, TInsertedData>::const_iterator begin() const {
return Inserted.begin();
}

THashMap<TInsertWriteId, TInsertedData>::const_iterator end() const {
return Inserted.end();
}

THashSet<TInsertWriteId> GetExpired(const TInstant timeBorder, const ui64 limit) const {
THashSet<TInsertWriteId> result;
for (auto& data : InsertedByWriteTime) {
if (timeBorder < data.GetWriteTime()) {
break;
}
if (data.GetData().IsNotAbortable()) {
continue;
}
result.emplace(data.GetData().GetInsertWriteId());
if (limit <= result.size()) {
break;
}
}
return result;
}

TInsertedData* AddVerified(TInsertedData&& data) {
const TInsertWriteId writeId = data.GetInsertWriteId();
auto itInsertion = Inserted.emplace(writeId, std::move(data));
AFL_VERIFY(itInsertion.second);
auto* dataPtr = &itInsertion.first->second;
InsertedByWriteTime.emplace(TInsertedDataInstant(*dataPtr));
return dataPtr;
}

const TInsertedData* GetOptional(const TInsertWriteId id) const {
auto it = Inserted.find(id);
if (it == Inserted.end()) {
return nullptr;
} else {
return &it->second;
}
}

TInsertedData* MutableOptional(const TInsertWriteId id) {
auto it = Inserted.find(id);
if (it == Inserted.end()) {
return nullptr;
} else {
return &it->second;
}
}

std::optional<TInsertedData> ExtractOptional(const TInsertWriteId id) {
auto it = Inserted.find(id);
if (it == Inserted.end()) {
return std::nullopt;
}
AFL_VERIFY(InsertedByWriteTime.erase(TInsertedDataInstant(it->second)));
TInsertedData result = std::move(it->second);
Inserted.erase(it);
return result;
}
};

class TInsertionSummary {
public:
struct TCounters {
Expand All @@ -22,9 +130,8 @@ class TInsertionSummary {
TCounters StatsCommitted;
const NColumnShard::TInsertTableCounters Counters;

THashMap<TInsertWriteId, TInsertedData> Inserted;
TInsertedContainer Inserted;
THashMap<TInsertWriteId, TInsertedData> Aborted;
mutable TInstant MinInsertedTs = TInstant::Zero();

std::map<TPathInfoIndexPriority, std::set<const TPathInfo*>> Priorities;
THashMap<ui64, TPathInfo> PathInfo;
Expand Down Expand Up @@ -57,18 +164,16 @@ class TInsertionSummary {
}

void MarkAsNotAbortable(const TInsertWriteId writeId) {
auto it = Inserted.find(writeId);
if (it == Inserted.end()) {
auto* data = Inserted.MutableOptional(writeId);
if (!data) {
return;
}
it->second.MarkAsNotAbortable();
data->MarkAsNotAbortable();
}

THashSet<TInsertWriteId> GetInsertedByPathId(const ui64 pathId) const;

THashSet<TInsertWriteId> GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const;

const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const {
const TInsertedContainer& GetInserted() const {
return Inserted;
}
const THashMap<TInsertWriteId, TInsertedData>& GetAborted() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ TPortionMeta TPortionMetaConstructor::Build() {

bool TPortionMetaConstructor::LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
if (!!Produced) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
return true;
}
if (portionMeta.GetTierName()) {
Expand Down
Loading

0 comments on commit 065294c

Please sign in to comment.