Skip to content

Commit

Permalink
correct synchronization after tablet reboot (#12296)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 5, 2024
1 parent e1bc51a commit a8ab3a2
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[20000u;]])");
}

AFL_VERIFY(updatesCount + 3 /*tablets count*/ * 1 /*normalizers*/ ==
AFL_VERIFY(updatesCount + 6 ==
(ui64)csController->GetActualizationRefreshSchemeCount().Val())(
"updates", updatesCount)("count",
csController->GetActualizationRefreshSchemeCount().Val());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ class TTieringProcessContext {
return Tasks;
}

TString DebugString() const {
TStringBuilder result;
result << "{";
for (auto&& i : Tasks) {
result << i.first.DebugString() << ":" << i.second.size() << ";";
}
result << "}";
return result;
}

bool AddPortion(const std::shared_ptr<const TPortionInfo>& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait);

bool IsRWAddressAvailable(const TRWAddress& address) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ class TController {

public:
void StartActualization(const NActualizer::TRWAddress& address) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization_start")("count", ActualizationsInProgress[address])(
"limit", GetLimitForAddress(address))("rw", address.DebugString());
AFL_VERIFY(++ActualizationsInProgress[address] <= (i32)GetLimitForAddress(address));
}

void FinishActualization(const NActualizer::TRWAddress& address) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization_finished")("count", ActualizationsInProgress[address])(
"limit", GetLimitForAddress(address))("rw", address.DebugString());
AFL_VERIFY(--ActualizationsInProgress[address] >= 0);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

namespace NKikimr::NOlap {

class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {
class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TCleanupPortionsColumnEngineChanges> {
private:
using TBase = TColumnEngineChanges;
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

namespace NKikimr::NOlap {

class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges {
class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TCleanupTablesColumnEngineChanges> {
private:
using TBase = TColumnEngineChanges;
protected:
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

namespace NKikimr::NOlap::NCompaction {

class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TGeneralCompactColumnEngineChanges> {
private:
YDB_ACCESSOR(ui64, PortionExpectedSize, 1.5 * (1 << 20));
using TBase = TCompactColumnEngineChanges;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace NKikimr::NOlap {

class TInsertColumnEngineChanges: public TChangesWithAppend {
class TInsertColumnEngineChanges: public TChangesWithAppend, public NColumnShard::TMonitoringObjectsCounter<TInsertColumnEngineChanges> {
private:
using TBase = TChangesWithAppend;
std::vector<TCommittedData> DataToIndex;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace NKikimr::NOlap {

class TTTLColumnEngineChanges: public TChangesWithAppend {
class TTTLColumnEngineChanges: public TChangesWithAppend, public NColumnShard::TMonitoringObjectsCounter<TTTLColumnEngineChanges> {
private:
using TBase = TChangesWithAppend;

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept {
AFL_VERIFY(dataLocksManager);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("external", pathEviction.size());

TSaverContext saverContext(StoragesManager);
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, VersionedIndex, SignalCounters, ActualizationController);
Expand All @@ -434,10 +434,12 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
i.second->BuildActualizationTasks(context, actualizationLag);
}
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("skip", "not_ready_tiers");
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("skip", "not_ready_tiers");
}
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> result;
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("rw_tasks_count", context.GetTasks().size());
for (auto&& i : context.GetTasks()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("rw", i.first.DebugString())("count", i.second.size());
for (auto&& t : i.second) {
SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->GetPortionsToRemoveSize());
result.emplace_back(t.GetTask());
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/columnshard/engines/scheme/column/info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,22 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf
AFL_VERIFY(Loader);
const auto checkNeedActualize = [&]() {
if (!Serializer.IsEqualTo(sourceColumnFeatures.Serializer)) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "serializer")
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "serializer")
("from", sourceColumnFeatures.Serializer.SerializeToProto().DebugString())
("to", Serializer.SerializeToProto().DebugString());
return true;
}
if (!Loader->IsEqualTo(*sourceColumnFeatures.Loader)) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "loader");
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "loader");
return true;
}
if (!!DictionaryEncoding != !!sourceColumnFeatures.DictionaryEncoding) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "dictionary")("from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary")(
"from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding);
return true;
}
if (!!DictionaryEncoding && !DictionaryEncoding->IsEqualTo(*sourceColumnFeatures.DictionaryEncoding)) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "dictionary_encoding")
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary_encoding")
("from", sourceColumnFeatures.DictionaryEncoding->SerializeToProto().DebugString())
("to", DictionaryEncoding->SerializeToProto().DebugString())
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class TGranuleActualizationIndex {
public:
std::vector<TCSMetadataRequest> CollectMetadataRequests(const THashMap<ui64, TPortionInfo::TPtr>& portions);

bool IsStarted() const {
return Actualizers.size();
}

void Start();
TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,31 @@ class TSchemeGlobalCounters: public NColumnShard::TCommonCountersOwner {

std::shared_ptr<NColumnShard::TValueAggregationAgent> QueueSizeInternalWrite;
std::shared_ptr<NColumnShard::TValueAggregationAgent> QueueSizeExternalWrite;

NMonitoring::TDynamicCounters::TCounterPtr Extracts;
NMonitoring::TDynamicCounters::TCounterPtr SkipNotOptimized;
NMonitoring::TDynamicCounters::TCounterPtr SkipNotReadyWrite;
NMonitoring::TDynamicCounters::TCounterPtr SkipPortionNotActualizable;
NMonitoring::TDynamicCounters::TCounterPtr EmptyTargetSchema;
NMonitoring::TDynamicCounters::TCounterPtr RefreshEmpty;
NMonitoring::TDynamicCounters::TCounterPtr SkipPortionToRemove;
NMonitoring::TDynamicCounters::TCounterPtr RefreshValue;
NMonitoring::TDynamicCounters::TCounterPtr AddPortion;
NMonitoring::TDynamicCounters::TCounterPtr RemovePortion;

public:
TSchemeGlobalCounters()
: TBase("SchemeActualizer")
{
, Extracts(TBase::GetDeriviative("Extracts/Count"))
, SkipNotOptimized(TBase::GetDeriviative("SkipNotOptimized/Count"))
, SkipNotReadyWrite(TBase::GetDeriviative("SkipNotReadyWrite/Count"))
, SkipPortionNotActualizable(TBase::GetDeriviative("SkipPortionNotActualizable/Count"))
, EmptyTargetSchema(TBase::GetDeriviative("EmptyTargetSchema/Count"))
, RefreshEmpty(TBase::GetDeriviative("RefreshEmpty/Count"))
, SkipPortionToRemove(TBase::GetDeriviative("SkipPortionToRemove/Count"))
, RefreshValue(TBase::GetDeriviative("RefreshValue/Count"))
, AddPortion(TBase::GetDeriviative("AddPortion/Count"))
, RemovePortion(TBase::GetDeriviative("RemovePortion/Count")) {
QueueSizeExternalWrite = TBase::GetValueAutoAggregations("Granule/Scheme/Actualization/QueueSize/ExternalWrite");
QueueSizeInternalWrite = TBase::GetValueAutoAggregations("Granule/Scheme/Actualization/QueueSize/InternalWrite");
}
Expand All @@ -28,7 +49,36 @@ class TSchemeGlobalCounters: public NColumnShard::TCommonCountersOwner {
static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildQueueSizeInternalWrite() {
return Singleton<TSchemeGlobalCounters>()->QueueSizeInternalWrite->GetClient();
}

static void OnAddPortion() {
Singleton<TSchemeGlobalCounters>()->AddPortion->Add(1);
}
static void OnRemovePortion() {
Singleton<TSchemeGlobalCounters>()->RemovePortion->Add(1);
}
static void OnSkipPortionNotActualizable() {
Singleton<TSchemeGlobalCounters>()->SkipPortionNotActualizable->Add(1);
}
static void OnEmptyTargetSchema() {
Singleton<TSchemeGlobalCounters>()->EmptyTargetSchema->Add(1);
}
static void OnRefreshEmpty() {
Singleton<TSchemeGlobalCounters>()->RefreshEmpty->Add(1);
}
static void OnSkipPortionToRemove() {
Singleton<TSchemeGlobalCounters>()->SkipPortionToRemove->Add(1);
}
static void OnRefreshValue() {
Singleton<TSchemeGlobalCounters>()->RefreshValue->Add(1);
}
static void OnExtract() {
Singleton<TSchemeGlobalCounters>()->Extracts->Add(1);
}
static void OnSkipNotOptimized() {
Singleton<TSchemeGlobalCounters>()->SkipNotOptimized->Add(1);
}
static void OnSkipNotReadyWrite() {
Singleton<TSchemeGlobalCounters>()->SkipNotReadyWrite->Add(1);
}
};

class TSchemeCounters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ std::optional<NKikimr::NOlap::NActualizer::TSchemeActualizer::TFullActualization

void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExternalContext& addContext) {
if (!TargetSchema) {
TSchemeGlobalCounters::OnEmptyTargetSchema();
return;
}
if (!addContext.GetPortionExclusiveGuarantee()) {
Expand All @@ -34,8 +35,10 @@ void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExterna
}
auto actualizationInfo = BuildActualizationInfo(info);
if (!actualizationInfo) {
TSchemeGlobalCounters::OnSkipPortionNotActualizable();
return;
}
TSchemeGlobalCounters::OnAddPortion();
NYDBTest::TControllers::GetColumnShardController()->AddPortionForActualizer(1);
AFL_VERIFY(PortionsToActualizeScheme[actualizationInfo->GetAddress()].emplace(info.GetPortionId()).second);
AFL_VERIFY(PortionsInfo.emplace(info.GetPortionId(), actualizationInfo->ExtractFindId()).second);
Expand All @@ -44,6 +47,7 @@ void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExterna
void TSchemeActualizer::DoRemovePortion(const ui64 portionId) {
auto it = PortionsInfo.find(portionId);
if (it == PortionsInfo.end()) {
TSchemeGlobalCounters::OnSkipPortionToRemove();
return;
}
auto itAddress = PortionsToActualizeScheme.find(it->second.GetRWAddress());
Expand All @@ -53,19 +57,25 @@ void TSchemeActualizer::DoRemovePortion(const ui64 portionId) {
if (itAddress->second.empty()) {
PortionsToActualizeScheme.erase(itAddress);
}
TSchemeGlobalCounters::OnRemovePortion();
PortionsInfo.erase(it);
}

void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) {
THashSet<ui64> portionsToRemove;
TSchemeGlobalCounters::OnExtract();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("rw_count", PortionsToActualizeScheme.size());
for (auto&& [address, portions] : PortionsToActualizeScheme) {
if (!tasksContext.IsRWAddressAvailable(address)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "skip_not_ready_for_write");
TSchemeGlobalCounters::OnSkipNotReadyWrite();
continue;
}
for (auto&& portionId : portions) {
auto portion = externalContext.GetPortionVerified(portionId);
if (!address.WriteIs(NBlobOperations::TGlobal::DefaultStorageId) && !address.WriteIs(NTiering::NCommon::DeleteTierName)) {
if (!portion->HasRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized)) {
TSchemeGlobalCounters::OnSkipNotOptimized();
continue;
}
}
Expand All @@ -76,6 +86,7 @@ void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, con
features.SetTargetTierName(portion->GetTierNameDef(IStoragesManager::DefaultStorageId));

if (!tasksContext.AddPortion(portion, std::move(features), {})) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "cannot_add_portion")("context", tasksContext.DebugString());
break;
} else {
portionsToRemove.emplace(portion->GetPortionId());
Expand All @@ -97,14 +108,16 @@ void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, con
}
Counters.QueueSizeInternalWrite->SetValue(waitQueueInternal);
Counters.QueueSizeExternalWrite->SetValue(waitQueueExternal);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("internal_queue", waitQueueInternal)("external_queue", waitQueueExternal);
}

void TSchemeActualizer::Refresh(const TAddExternalContext& externalContext) {
TargetSchema = VersionedIndex.GetLastCriticalSchema();
if (!TargetSchema) {
TSchemeGlobalCounters::OnRefreshEmpty();
AFL_VERIFY(PortionsInfo.empty());
} else {
TSchemeGlobalCounters::OnRefreshValue();
NYDBTest::TControllers::GetColumnShardController()->AddPortionForActualizer(-1 * PortionsInfo.size());
PortionsInfo.clear();
PortionsToActualizeScheme.clear();
Expand All @@ -114,4 +127,10 @@ void TSchemeActualizer::Refresh(const TAddExternalContext& externalContext) {
}
}

TSchemeActualizer::TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
: PathId(pathId)
, VersionedIndex(versionedIndex) {
Y_UNUSED(PathId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ class TSchemeActualizer: public IActualizer {
public:
void Refresh(const TAddExternalContext& externalContext);

TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
: PathId(pathId)
, VersionedIndex(versionedIndex) {
Y_UNUSED(PathId);
}
TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex);
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ void TGranuleMeta::UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>& port

void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const {
if (context.GetActualInstant() < NextActualizations) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_actualization")("waiting", NextActualizations - context.GetActualInstant());
return;
}
NActualizer::TExternalTasksContext extTasks(Portions);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/storage/granule/granule.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,9 @@ class TGranuleMeta: TNonCopyable {

DataAccessorsManager->AskData(request);
}

if (ActualizationIndex->IsStarted()) {
RefreshScheme();
}
}

const TGranuleAdditiveSummary& GetAdditiveSummary() const;
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ enum EServiceKikimr {
S3_WRAPPER = 803;
CONTINUOUS_BACKUP = 804;

TX_COLUMNSHARD_ACTUALIZATION = 850;

// System views
SYSTEM_VIEWS = 900;

Expand Down

0 comments on commit a8ab3a2

Please sign in to comment.