Skip to content

Commit

Permalink
Merge 03f3c24 into e9534cc
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 3, 2024
2 parents e9534cc + 03f3c24 commit 40ab61f
Show file tree
Hide file tree
Showing 24 changed files with 431 additions and 145 deletions.
1 change: 0 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,6 @@ message TColumnShardConfig {

optional TIndexMetadataMemoryLimit IndexMetadataMemoryLimit = 12;
optional bool CleanupEnabled = 13 [default = true];
optional uint32 RemovedPortionLivetimeSeconds = 14 [default = 600];

message TRepairInfo {
optional string ClassName = 1;
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
EnqueueBackgroundActivities();
BackgroundSessionsManager->Start();
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
Expand Down Expand Up @@ -161,7 +162,9 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
if (HasIndex()) {
index = &GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex();
}
InFlightReadsTracker.RemoveInFlightRequest(ev->Get()->RequestCookie, index);

InFlightReadsTracker.RemoveInFlightRequest(
ev->Get()->RequestCookie, index, TInstant::Now());

ui64 txId = ev->Get()->TxId;
if (ScanTxInFlight.contains(txId)) {
Expand All @@ -173,6 +176,14 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
}
}

void TColumnShard::Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& /*ev*/, const TActorContext& ctx) {
if (auto writeTx = InFlightReadsTracker.Ping(
this, NYDBTest::TControllers::GetColumnShardController()->GetPingCheckPeriod(0.6 * GetMaxReadStaleness()), TInstant::Now())) {
Execute(writeTx.release(), ctx);
}
ctx.Schedule(0.3 * GetMaxReadStaleness(), new TEvPrivate::TEvPingSnapshotsUsage());
}

void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Manual) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup::MANUAL")("tablet_id", TabletID());
Expand All @@ -182,6 +193,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
SendWaitPlanStep(GetOutdatedStep());

SendPeriodicStats();
EnqueueBackgroundActivities();
ctx.Schedule(PeriodicWakeupActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
Self->SharingSessionsManager = local;
}
{
TMemoryProfileGuard g("TTxInit/TInFlightReadsTracker");
TInFlightReadsTracker local(Self->StoragesManager, Self->Counters.GetRequestsTracingCounters());
if (!local.LoadFromDatabase(txc.DB)) {
return false;
}
Self->InFlightReadsTracker = std::move(local);
}

Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
Expand Down
29 changes: 19 additions & 10 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
EPercentileCounters_descriptor, ETxTypes_descriptor>())
, Counters(*TabletCountersHolder)
, InFlightReadsTracker(StoragesManager)
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
Expand All @@ -84,8 +84,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters())
, BackgroundController(Counters.GetBackgroundControllerCounters())
, NormalizerController(StoragesManager, Counters.GetSubscribeCounters())
, SysLocks(this)
, MaxReadStaleness(TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms())) {
, SysLocks(this) {
}

void TColumnShard::OnDetach(const TActorContext& ctx) {
Expand Down Expand Up @@ -186,12 +185,18 @@ ui64 TColumnShard::GetOutdatedStep() const {
return step;
}

ui64 TColumnShard::GetMinReadStep() const {
const TDuration maxReadStaleness = NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(MaxReadStaleness);
ui64 delayMillisec = maxReadStaleness.MilliSeconds();
NOlap::TSnapshot TColumnShard::GetMinReadSnapshot() const {
ui64 delayMillisec = GetMaxReadStaleness().MilliSeconds();
ui64 passedStep = GetOutdatedStep();
ui64 minReadStep = (passedStep > delayMillisec ? passedStep - delayMillisec : 0);
return minReadStep;
Counters.GetRequestsTracingCounters()->OnDefaultMinSnapshotInstant(TInstant::MilliSeconds(minReadStep));

if (auto ssClean = InFlightReadsTracker.GetSnapshotToClean()) {
if (ssClean->GetPlanStep() < minReadStep) {
return *ssClean;
}
}
return NOlap::TSnapshot::MaxForPlanStep(minReadStep);
}

TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const {
Expand Down Expand Up @@ -785,9 +790,8 @@ void TColumnShard::SetupCleanupPortions() {
return;
}

NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};

auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(cleanupSnapshot, TablesManager.GetPathsToDrop(), DataLocksManager);
auto changes =
TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
Expand Down Expand Up @@ -1134,4 +1138,9 @@ const NKikimr::NColumnShard::NTiers::TManager* TColumnShard::GetTierManagerPoint
return Tiers->GetManagerOptional(tierId);
}

TDuration TColumnShard::GetMaxReadStaleness() {
return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(
TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms()));
}

}
8 changes: 6 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class TColumnShard
void Handle(TEvPrivate::TEvScanStats::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -361,6 +363,8 @@ class TColumnShard
HFunc(TEvPrivate::TEvScanStats, Handle);
HFunc(TEvPrivate::TEvReadFinished, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
HFunc(TEvPrivate::TEvPingSnapshotsUsage, Handle);

HFunc(NEvents::TDataEvents::TEvWrite, Handle);
HFunc(TEvPrivate::TEvWriteDraft, Handle);
HFunc(TEvPrivate::TEvGarbageCollectionFinished, Handle);
Expand Down Expand Up @@ -465,7 +469,7 @@ class TColumnShard
TLimits Limits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;
const TDuration MaxReadStaleness;
static TDuration GetMaxReadStaleness();

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
Expand All @@ -475,7 +479,7 @@ class TColumnShard
void SendWaitPlanStep(ui64 step);
void RescheduleWaitingReads();
NOlap::TSnapshot GetMaxReadVersion() const;
ui64 GetMinReadStep() const;
NOlap::TSnapshot GetMinReadSnapshot() const;
ui64 GetOutdatedStep() const;
TDuration GetTxCompleteLag() const {
ui64 mediatorTime = MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0;
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct TEvPrivate {
EvExportSaveCursor,

EvTaskProcessedResult,
EvPingSnapshotsUsage,

EvEnd
};
Expand Down Expand Up @@ -158,7 +159,11 @@ struct TEvPrivate {
bool Manual;
};

class TEvWriteBlobsResult : public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
struct TEvPingSnapshotsUsage: public TEventLocal<TEvPingSnapshotsUsage, EvPingSnapshotsUsage> {
TEvPingSnapshotsUsage() = default;
};

class TEvWriteBlobsResult: public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
private:
NColumnShard::TBlobPutResult::TPtr PutResult;
NOlap::TWritingBuffer WritesBuffer;
Expand Down
14 changes: 12 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ struct Schema : NIceDb::Schema {
TableVersionInfo = 11,
SmallBlobs = 12,
OneToOneEvictedBlobs = 13,
BlobsToDeleteWT = 14
BlobsToDeleteWT = 14,
InFlightSnapshots = 15
};

// Tablet tables
Expand Down Expand Up @@ -250,6 +251,14 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<BlobId, TabletId>;
};

struct InFlightSnapshots: Table<(ui32)ECommonTables::InFlightSnapshots> {
struct PlanStep: Column<1, NScheme::NTypeIds::Uint64> {};
struct TxId: Column<2, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<PlanStep, TxId>;
using TColumns = TableColumns<PlanStep, TxId>;
};

// Index tables

// InsertTable - common for all indices
Expand Down Expand Up @@ -545,7 +554,8 @@ struct Schema : NIceDb::Schema {
BackgroundSessions,
ShardingInfo,
Normalizers,
NormalizerEvents
NormalizerEvents,
InFlightSnapshots
>;

//
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/common/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ TString TSnapshot::SerializeToString() const {
return SerializeToProto().SerializeAsString();
}

NKikimr::NOlap::TSnapshot TSnapshot::MaxForPlanStep(const ui64 planStep) noexcept {
return TSnapshot(planStep, ::Max<ui64>());
}

NKikimr::NOlap::TSnapshot TSnapshot::MaxForPlanInstant(const TInstant planInstant) noexcept {
return TSnapshot(planInstant.MilliSeconds(), ::Max<ui64>());
}

};
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/common/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class TSnapshot {
return TSnapshot(-1ll, -1ll);
}

static TSnapshot MaxForPlanInstant(const TInstant planInstant) noexcept;

static TSnapshot MaxForPlanStep(const ui64 planStep) noexcept;

constexpr bool operator==(const TSnapshot&) const noexcept = default;

constexpr auto operator<=>(const TSnapshot&) const noexcept = default;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/counters/counters_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "counters_manager.h"

namespace NKikimr::NColumnShard {

} // namespace NKikimr::NColumnShard
22 changes: 13 additions & 9 deletions ydb/core/tx/columnshard/counters/counters_manager.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
#pragma once

#include "background_controller.h"
#include "column_tables.h"
#include "columnshard.h"
#include "indexation.h"
#include "req_tracer.h"
#include "scan.h"
#include "column_tables.h"
#include "writes_monitor.h"
#include "tablet_counters.h"
#include "background_controller.h"
#include "writes_monitor.h"

#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/protos/counters_datashard.pb.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>

#include <library/cpp/time_provider/time_provider.h>

namespace NKikimr::NColumnShard {
Expand All @@ -32,6 +34,7 @@ class TCountersManager {
YDB_READONLY(TIndexationCounters, IndexationCounters, TIndexationCounters("Indexation"));
YDB_READONLY(TIndexationCounters, CompactionCounters, TIndexationCounters("GeneralCompaction"));
YDB_READONLY(TScanCounters, ScanCounters, TScanCounters("Scan"));
YDB_READONLY_DEF(std::shared_ptr<TRequestsTracerCounters>, RequestsTracingCounters);
YDB_READONLY_DEF(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>, SubscribeCounters);

public:
Expand All @@ -40,8 +43,9 @@ class TCountersManager {
, WritesMonitor(std::make_shared<TWritesMonitor>(tabletCounters))
, BackgroundControllerCounters(std::make_shared<TBackgroundControllerCounters>())
, ColumnTablesCounters(std::make_shared<TColumnTablesCounters>())
, RequestsTracingCounters(std::make_shared<TRequestsTracerCounters>())
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>()) {
}
};

} // namespace NKikimr::NColumnShard
} // namespace NKikimr::NColumnShard
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/counters/req_tracer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "req_tracer.h"

namespace NKikimr::NColumnShard {

}
51 changes: 51 additions & 0 deletions ydb/core/tx/columnshard/counters/req_tracer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once
#include "common/owner.h"
#include <ydb/core/tx/columnshard/common/snapshot.h>

namespace NKikimr::NColumnShard {

class TRequestsTracerCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr RequestedMinSnapshotAge;
NMonitoring::TDynamicCounters::TCounterPtr DefaultMinSnapshotAge;
NMonitoring::TDynamicCounters::TCounterPtr SnapshotsCount;
NMonitoring::TDynamicCounters::TCounterPtr SnapshotLock;
NMonitoring::TDynamicCounters::TCounterPtr SnapshotUnlock;

public:

TRequestsTracerCounters()
: TBase("cs_requests_tracing")
, RequestedMinSnapshotAge(TBase::GetValue("Snapshots/RequestedAge/Seconds"))
, DefaultMinSnapshotAge(TBase::GetValue("Snapshots/DefaultAge/Seconds"))
, SnapshotsCount(TBase::GetValue("Snapshots/Count"))
, SnapshotLock(TBase::GetDeriviative("Snapshots/Lock"))
, SnapshotUnlock(TBase::GetDeriviative("Snapshots/Unlock"))
{

}

void OnDefaultMinSnapshotInstant(const TInstant instant) const {
DefaultMinSnapshotAge->Set((TInstant::Now() - instant).Seconds());
}

void OnSnapshotsInfo(const ui32 count, const std::optional<NOlap::TSnapshot> snapshotPlanStep) const {
if (snapshotPlanStep) {
RequestedMinSnapshotAge->Set((TInstant::Now() - snapshotPlanStep->GetPlanInstant()).Seconds());
} else {
RequestedMinSnapshotAge->Set(0);
}
SnapshotsCount->Set(count);

}

void OnSnapshotLocked() const {
SnapshotLock->Add(1);
}
void OnSnapshotUnlocked() const {
SnapshotUnlock->Add(1);
}
};

}
12 changes: 7 additions & 5 deletions ydb/core/tx/columnshard/counters/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ LIBRARY()

SRCS(
background_controller.cpp
column_tables.cpp
indexation.cpp
scan.cpp
engine_logs.cpp
counters_manager.cpp
blobs_manager.cpp
column_tables.cpp
columnshard.cpp
insert_table.cpp
common_data.cpp
engine_logs.cpp
indexation.cpp
insert_table.cpp
req_tracer.cpp
scan.cpp
splitter.cpp
)

Expand Down
Loading

0 comments on commit 40ab61f

Please sign in to comment.