Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

correct synchronization after tablet reboot #12296

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[20000u;]])");
}

AFL_VERIFY(updatesCount + 3 /*tablets count*/ * 1 /*normalizers*/ ==
AFL_VERIFY(updatesCount + 6 ==
(ui64)csController->GetActualizationRefreshSchemeCount().Val())(
"updates", updatesCount)("count",
csController->GetActualizationRefreshSchemeCount().Val());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ class TTieringProcessContext {
return Tasks;
}

TString DebugString() const {
TStringBuilder result;
result << "{";
for (auto&& i : Tasks) {
result << i.first.DebugString() << ":" << i.second.size() << ";";
}
result << "}";
return result;
}

bool AddPortion(const std::shared_ptr<const TPortionInfo>& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait);

bool IsRWAddressAvailable(const TRWAddress& address) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ class TController {

public:
void StartActualization(const NActualizer::TRWAddress& address) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization_start")("count", ActualizationsInProgress[address])(
"limit", GetLimitForAddress(address))("rw", address.DebugString());
AFL_VERIFY(++ActualizationsInProgress[address] <= (i32)GetLimitForAddress(address));
}

void FinishActualization(const NActualizer::TRWAddress& address) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization_finished")("count", ActualizationsInProgress[address])(
"limit", GetLimitForAddress(address))("rw", address.DebugString());
AFL_VERIFY(--ActualizationsInProgress[address] >= 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

namespace NKikimr::NOlap {

class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {
class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TCleanupPortionsColumnEngineChanges> {
private:
using TBase = TColumnEngineChanges;
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

namespace NKikimr::NOlap {

class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges {
class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TCleanupTablesColumnEngineChanges> {
private:
using TBase = TColumnEngineChanges;
protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

namespace NKikimr::NOlap::NCompaction {

class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TGeneralCompactColumnEngineChanges> {
private:
YDB_ACCESSOR(ui64, PortionExpectedSize, 1.5 * (1 << 20));
using TBase = TCompactColumnEngineChanges;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace NKikimr::NOlap {

class TInsertColumnEngineChanges: public TChangesWithAppend {
class TInsertColumnEngineChanges: public TChangesWithAppend, public NColumnShard::TMonitoringObjectsCounter<TInsertColumnEngineChanges> {
private:
using TBase = TChangesWithAppend;
std::vector<TCommittedData> DataToIndex;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace NKikimr::NOlap {

class TTTLColumnEngineChanges: public TChangesWithAppend {
class TTTLColumnEngineChanges: public TChangesWithAppend, public NColumnShard::TMonitoringObjectsCounter<TTTLColumnEngineChanges> {
private:
using TBase = TChangesWithAppend;

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept {
AFL_VERIFY(dataLocksManager);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("external", pathEviction.size());

TSaverContext saverContext(StoragesManager);
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, VersionedIndex, SignalCounters, ActualizationController);
Expand All @@ -440,10 +440,12 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
i.second->BuildActualizationTasks(context, actualizationLag);
}
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("skip", "not_ready_tiers");
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("skip", "not_ready_tiers");
}
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> result;
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("rw_tasks_count", context.GetTasks().size());
for (auto&& i : context.GetTasks()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("rw", i.first.DebugString())("count", i.second.size());
for (auto&& t : i.second) {
SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->GetPortionsToRemoveSize());
result.emplace_back(t.GetTask());
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/columnshard/engines/scheme/column/info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,22 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf
AFL_VERIFY(Loader);
const auto checkNeedActualize = [&]() {
if (!Serializer.IsEqualTo(sourceColumnFeatures.Serializer)) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "serializer")
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "serializer")
("from", sourceColumnFeatures.Serializer.SerializeToProto().DebugString())
("to", Serializer.SerializeToProto().DebugString());
return true;
}
if (!Loader->IsEqualTo(*sourceColumnFeatures.Loader)) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "loader");
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "loader");
return true;
}
if (!!DictionaryEncoding != !!sourceColumnFeatures.DictionaryEncoding) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "dictionary")("from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary")(
"from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding);
return true;
}
if (!!DictionaryEncoding && !DictionaryEncoding->IsEqualTo(*sourceColumnFeatures.DictionaryEncoding)) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "dictionary_encoding")
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary_encoding")
("from", sourceColumnFeatures.DictionaryEncoding->SerializeToProto().DebugString())
("to", DictionaryEncoding->SerializeToProto().DebugString())
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class TGranuleActualizationIndex {
public:
std::vector<TCSMetadataRequest> CollectMetadataRequests(const THashMap<ui64, TPortionInfo::TPtr>& portions);

bool IsStarted() const {
return Actualizers.size();
}

void Start();
TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,31 @@ class TSchemeGlobalCounters: public NColumnShard::TCommonCountersOwner {

std::shared_ptr<NColumnShard::TValueAggregationAgent> QueueSizeInternalWrite;
std::shared_ptr<NColumnShard::TValueAggregationAgent> QueueSizeExternalWrite;

NMonitoring::TDynamicCounters::TCounterPtr Extracts;
NMonitoring::TDynamicCounters::TCounterPtr SkipNotOptimized;
NMonitoring::TDynamicCounters::TCounterPtr SkipNotReadyWrite;
NMonitoring::TDynamicCounters::TCounterPtr SkipPortionNotActualizable;
NMonitoring::TDynamicCounters::TCounterPtr EmptyTargetSchema;
NMonitoring::TDynamicCounters::TCounterPtr RefreshEmpty;
NMonitoring::TDynamicCounters::TCounterPtr SkipPortionToRemove;
NMonitoring::TDynamicCounters::TCounterPtr RefreshValue;
NMonitoring::TDynamicCounters::TCounterPtr AddPortion;
NMonitoring::TDynamicCounters::TCounterPtr RemovePortion;

public:
TSchemeGlobalCounters()
: TBase("SchemeActualizer")
{
, Extracts(TBase::GetDeriviative("Extracts/Count"))
, SkipNotOptimized(TBase::GetDeriviative("SkipNotOptimized/Count"))
, SkipNotReadyWrite(TBase::GetDeriviative("SkipNotReadyWrite/Count"))
, SkipPortionNotActualizable(TBase::GetDeriviative("SkipPortionNotActualizable/Count"))
, EmptyTargetSchema(TBase::GetDeriviative("EmptyTargetSchema/Count"))
, RefreshEmpty(TBase::GetDeriviative("RefreshEmpty/Count"))
, SkipPortionToRemove(TBase::GetDeriviative("SkipPortionToRemove/Count"))
, RefreshValue(TBase::GetDeriviative("RefreshValue/Count"))
, AddPortion(TBase::GetDeriviative("AddPortion/Count"))
, RemovePortion(TBase::GetDeriviative("RemovePortion/Count")) {
QueueSizeExternalWrite = TBase::GetValueAutoAggregations("Granule/Scheme/Actualization/QueueSize/ExternalWrite");
QueueSizeInternalWrite = TBase::GetValueAutoAggregations("Granule/Scheme/Actualization/QueueSize/InternalWrite");
}
Expand All @@ -28,7 +49,36 @@ class TSchemeGlobalCounters: public NColumnShard::TCommonCountersOwner {
static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildQueueSizeInternalWrite() {
return Singleton<TSchemeGlobalCounters>()->QueueSizeInternalWrite->GetClient();
}

static void OnAddPortion() {
Singleton<TSchemeGlobalCounters>()->AddPortion->Add(1);
}
static void OnRemovePortion() {
Singleton<TSchemeGlobalCounters>()->RemovePortion->Add(1);
}
static void OnSkipPortionNotActualizable() {
Singleton<TSchemeGlobalCounters>()->SkipPortionNotActualizable->Add(1);
}
static void OnEmptyTargetSchema() {
Singleton<TSchemeGlobalCounters>()->EmptyTargetSchema->Add(1);
}
static void OnRefreshEmpty() {
Singleton<TSchemeGlobalCounters>()->RefreshEmpty->Add(1);
}
static void OnSkipPortionToRemove() {
Singleton<TSchemeGlobalCounters>()->SkipPortionToRemove->Add(1);
}
static void OnRefreshValue() {
Singleton<TSchemeGlobalCounters>()->RefreshValue->Add(1);
}
static void OnExtract() {
Singleton<TSchemeGlobalCounters>()->Extracts->Add(1);
}
static void OnSkipNotOptimized() {
Singleton<TSchemeGlobalCounters>()->SkipNotOptimized->Add(1);
}
static void OnSkipNotReadyWrite() {
Singleton<TSchemeGlobalCounters>()->SkipNotReadyWrite->Add(1);
}
};

class TSchemeCounters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ std::optional<NKikimr::NOlap::NActualizer::TSchemeActualizer::TFullActualization

void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExternalContext& addContext) {
if (!TargetSchema) {
TSchemeGlobalCounters::OnEmptyTargetSchema();
return;
}
if (!addContext.GetPortionExclusiveGuarantee()) {
Expand All @@ -34,8 +35,10 @@ void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExterna
}
auto actualizationInfo = BuildActualizationInfo(info);
if (!actualizationInfo) {
TSchemeGlobalCounters::OnSkipPortionNotActualizable();
return;
}
TSchemeGlobalCounters::OnAddPortion();
NYDBTest::TControllers::GetColumnShardController()->AddPortionForActualizer(1);
AFL_VERIFY(PortionsToActualizeScheme[actualizationInfo->GetAddress()].emplace(info.GetPortionId()).second);
AFL_VERIFY(PortionsInfo.emplace(info.GetPortionId(), actualizationInfo->ExtractFindId()).second);
Expand All @@ -44,6 +47,7 @@ void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExterna
void TSchemeActualizer::DoRemovePortion(const ui64 portionId) {
auto it = PortionsInfo.find(portionId);
if (it == PortionsInfo.end()) {
TSchemeGlobalCounters::OnSkipPortionToRemove();
return;
}
auto itAddress = PortionsToActualizeScheme.find(it->second.GetRWAddress());
Expand All @@ -53,19 +57,25 @@ void TSchemeActualizer::DoRemovePortion(const ui64 portionId) {
if (itAddress->second.empty()) {
PortionsToActualizeScheme.erase(itAddress);
}
TSchemeGlobalCounters::OnRemovePortion();
PortionsInfo.erase(it);
}

void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) {
THashSet<ui64> portionsToRemove;
TSchemeGlobalCounters::OnExtract();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("rw_count", PortionsToActualizeScheme.size());
for (auto&& [address, portions] : PortionsToActualizeScheme) {
if (!tasksContext.IsRWAddressAvailable(address)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "skip_not_ready_for_write");
TSchemeGlobalCounters::OnSkipNotReadyWrite();
continue;
}
for (auto&& portionId : portions) {
auto portion = externalContext.GetPortionVerified(portionId);
if (!address.WriteIs(NBlobOperations::TGlobal::DefaultStorageId) && !address.WriteIs(NTiering::NCommon::DeleteTierName)) {
if (!portion->HasRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized)) {
TSchemeGlobalCounters::OnSkipNotOptimized();
continue;
}
}
Expand All @@ -76,6 +86,7 @@ void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, con
features.SetTargetTierName(portion->GetTierNameDef(IStoragesManager::DefaultStorageId));

if (!tasksContext.AddPortion(portion, std::move(features), {})) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "cannot_add_portion")("context", tasksContext.DebugString());
break;
} else {
portionsToRemove.emplace(portion->GetPortionId());
Expand All @@ -97,14 +108,16 @@ void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, con
}
Counters.QueueSizeInternalWrite->SetValue(waitQueueInternal);
Counters.QueueSizeExternalWrite->SetValue(waitQueueExternal);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("internal_queue", waitQueueInternal)("external_queue", waitQueueExternal);
}

void TSchemeActualizer::Refresh(const TAddExternalContext& externalContext) {
TargetSchema = VersionedIndex.GetLastCriticalSchema();
if (!TargetSchema) {
TSchemeGlobalCounters::OnRefreshEmpty();
AFL_VERIFY(PortionsInfo.empty());
} else {
TSchemeGlobalCounters::OnRefreshValue();
NYDBTest::TControllers::GetColumnShardController()->AddPortionForActualizer(-1 * PortionsInfo.size());
PortionsInfo.clear();
PortionsToActualizeScheme.clear();
Expand All @@ -114,4 +127,10 @@ void TSchemeActualizer::Refresh(const TAddExternalContext& externalContext) {
}
}

TSchemeActualizer::TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
: PathId(pathId)
, VersionedIndex(versionedIndex) {
Y_UNUSED(PathId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ class TSchemeActualizer: public IActualizer {
public:
void Refresh(const TAddExternalContext& externalContext);

TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
: PathId(pathId)
, VersionedIndex(versionedIndex) {
Y_UNUSED(PathId);
}
TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex);
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ void TGranuleMeta::UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>& port

void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const {
if (context.GetActualInstant() < NextActualizations) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_actualization")("waiting", NextActualizations - context.GetActualInstant());
return;
}
NActualizer::TExternalTasksContext extTasks(Portions);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/storage/granule/granule.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,9 @@ class TGranuleMeta: TNonCopyable {

DataAccessorsManager->AskData(request);
}

if (ActualizationIndex->IsStarted()) {
RefreshScheme();
}
}

const TGranuleAdditiveSummary& GetAdditiveSummary() const;
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ enum EServiceKikimr {
S3_WRAPPER = 803;
CONTINUOUS_BACKUP = 804;

TX_COLUMNSHARD_ACTUALIZATION = 850;

// System views
SYSTEM_VIEWS = 900;

Expand Down
Loading