Skip to content

Commit

Permalink
fix test and correct normalizer conditions (#11504)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 12, 2024
1 parent a819f67 commit cf0394a
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 10 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-Streaming
ydb/core/tx/columnshard/engines/ut TColumnEngineTestLogs.IndexTtl
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts
Expand Down
23 changes: 17 additions & 6 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,15 +515,26 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
return out;
}

void TColumnEngineForLogs::OnTieringModified(
const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) {
if (!ActualizationStarted) {
for (auto&& i : GranulesStorage->GetTables()) {
i.second->StartActualizationIndex();
bool TColumnEngineForLogs::StartActualization(const THashMap<ui64, TTiering>& specialPathEviction) {
if (ActualizationStarted) {
return false;
}
for (auto&& i : GranulesStorage->GetTables()) {
i.second->StartActualizationIndex();
}
for (auto&& i : specialPathEviction) {
auto g = GetGranuleOptional(i.first);
if (g) {
g->RefreshTiering(i.second);
}
}

ActualizationStarted = true;
return true;
}

void TColumnEngineForLogs::OnTieringModified(
const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) {
StartActualization({});
AFL_VERIFY(manager);
THashMap<ui64, TTiering> tierings = manager->GetTiering();
ttl.AddTtls(tierings);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TColumnEngineForLogs: public IColumnEngine {
public:
virtual std::shared_ptr<ITxReader> BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) override;
bool FinishLoading();
bool StartActualization(const THashMap<ui64, TTiering>& specialPathEviction);

virtual bool IsOverloadedByMetadata(const ui64 limit) const override {
return limit < TGranulesStat::GetSumMetadataMemoryPortionsSize();
Expand Down
26 changes: 26 additions & 0 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,33 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u
return result;
}

namespace {
class TTestMetadataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
private:
std::shared_ptr<IMetadataAccessorResultProcessor> Processor;
TColumnEngineForLogs& Engine;

virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
Processor->ApplyResult(std::move(result), Engine);
}

public:
TTestMetadataAccessorsSubscriber(const std::shared_ptr<IMetadataAccessorResultProcessor>& processor, TColumnEngineForLogs& engine)
: Processor(processor)
, Engine(engine) {
}
};

}

bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, const THashMap<ui64, NOlap::TTiering>& pathEviction, ui32 expectedToDrop) {
engine.StartActualization(pathEviction);
std::vector<NOlap::TCSMetadataRequest> requests = engine.CollectMetadataRequests();
for (auto&& i : requests) {
i.GetRequest()->RegisterSubscriber(std::make_shared<TTestMetadataAccessorsSubscriber>(i.GetProcessor(), engine));
engine.FetchDataAccessors(i.GetRequest());
}

std::vector<std::shared_ptr<TTTLColumnEngineChanges>> vChanges = engine.StartTtl(pathEviction, EmptyDataLocksManager, 512 * 1024 * 1024);
AFL_VERIFY(vChanges.size() == 1)("count", vChanges.size());
auto changes = vChanges.front();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
if (!ready) {
return TConclusionStatus::Fail("Not ready");
}
if (!AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
return std::vector<INormalizerTask::TPtr>();
}
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage());
THashMap<ui64, TPortionLoadContext> portions0;
THashSet<ui64> existPortions0;
THashMap<ui64, std::map<TFullChunkAddress, TColumnChunkLoadContext>> columns0;
Expand Down

0 comments on commit cf0394a

Please sign in to comment.