Skip to content

Commit

Permalink
Merge b17caf9 into 29362cf
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 9, 2024
2 parents 29362cf + b17caf9 commit 8a88bab
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 81 deletions.
48 changes: 36 additions & 12 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,27 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
}
};

class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {
private:
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
AFL_VERIFY(ResourcesGuard);
DoOnRequestsFinished(std::move(result), std::move(ResourcesGuard));
}

protected:
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) = 0;

public:
void SetResourcesGuard(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
AFL_VERIFY(!ResourcesGuard);
AFL_VERIFY(guard);
ResourcesGuard = guard;
}
};

class TDataAccessorsSubscriber: public TDataAccessorsSubscriberBase {
protected:
const NActors::TActorId ShardActorId;
std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
Expand All @@ -625,8 +645,9 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {

virtual void DoOnRequestsFinishedImpl() = 0;

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) override final {
Changes->SetFetchedDataAccessors(std::move(result), NOlap::TDataAccessorsInitializationContext(VersionedIndex));
Changes->ResourcesGuard = std::move(guard);
DoOnRequestsFinishedImpl();
}

Expand Down Expand Up @@ -822,7 +843,7 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa
private:
using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
std::shared_ptr<TDataAccessorsSubscriber> Subscriber;
std::shared_ptr<TDataAccessorsSubscriberBase> Subscriber;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;

virtual void DoOnAllocationSuccess(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
Expand All @@ -833,7 +854,7 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa

public:
TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriber>& subscriber,
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriberBase>& subscriber,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
: TBase(0, memory, externalTaskId, context)
, Request(std::move(request))
Expand All @@ -852,7 +873,6 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);

auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TCompactChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
}
Expand Down Expand Up @@ -895,7 +915,6 @@ class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscribe
virtual void DoOnRequestsFinishedImpl() override {
ACFL_DEBUG("background", "ttl")("need_writes", true);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TTTLChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
}
Expand All @@ -920,12 +939,13 @@ class TNoWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscri
using TBase::TBase;
};

class TCSMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber, public TObjectCounter<TCSMetadataSubscriber> {
class TCSMetadataSubscriber: public TDataAccessorsSubscriberBase, public TObjectCounter<TCSMetadataSubscriber> {
private:
NActors::TActorId TabletActorId;
const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor> Processor;
const ui64 Generation;
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override {
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) override {
result.SetResourcesGuard(std::move(guard));
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<TEvPrivate::TEvMetadataAccessorsInfo>(Processor, Generation, std::move(result)));
}
Expand All @@ -947,8 +967,12 @@ void TColumnShard::SetupMetadata() {
}
std::vector<NOlap::TCSMetadataRequest> requests = TablesManager.MutablePrimaryIndex().CollectMetadataRequests();
for (auto&& i : requests) {
i.GetRequest()->RegisterSubscriber(std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()));
DataAccessorsManager->AskData(i.GetRequest());
const ui64 accessorsMemory =
i.GetRequest()->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription,
std::shared_ptr<NOlap::TDataAccessorsRequest>(i.GetRequest()),
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified()));
}
}

Expand Down Expand Up @@ -1004,7 +1028,6 @@ class TCleanupPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriber {
virtual void DoOnRequestsFinishedImpl() override {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("background", "cleanup")("changes_info", Changes->DebugString());
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
NActors::TActivationContext::Send(ShardActorId, std::move(ev));
}
Expand Down Expand Up @@ -1093,7 +1116,8 @@ void TColumnShard::Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActor

void TColumnShard::Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& /*ctx*/) {
AFL_VERIFY(ev->Get()->GetGeneration() == Generation())("ev", ev->Get()->GetGeneration())("tablet", Generation());
ev->Get()->GetProcessor()->ApplyResult(ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>());
ev->Get()->GetProcessor()->ApplyResult(
ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>());
SetupMetadata();
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ struct TEvPrivate {
return std::move(Result);
}

TEvMetadataAccessorsInfo(
const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor>& processor, const ui64 gen, NOlap::TDataAccessorsResult&& result)
TEvMetadataAccessorsInfo(const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor>& processor, const ui64 gen,
NOlap::TDataAccessorsResult&& result)
: Processor(processor)
, Generation(gen)
, Result(std::move(result)) {
Expand Down
18 changes: 17 additions & 1 deletion ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

namespace NKikimr::NOlap {

class TDataAccessorsRequest;

class TDataAccessorsResult {
class TDataAccessorsResult : private NNonCopyable::TMoveOnly {
private:
THashMap<ui64, TString> ErrorsByPathId;
THashMap<ui64, std::vector<TPortionDataAccessor>> AccessorsByPathId;
THashMap<ui64, TPortionDataAccessor> PortionsById;
std::vector<TPortionDataAccessor> Portions;
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;

public:
const std::vector<TPortionDataAccessor>& GetPortions() const {
Expand Down Expand Up @@ -60,6 +62,16 @@ class TDataAccessorsResult {
bool HasErrors() const {
return ErrorsByPathId.size();
}

void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) {
AFL_VERIFY(!ResourcesGuard);
AFL_VERIFY(guard);
}

std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& ExtractResourcesGuard() {
AFL_VERIFY(ResourcesGuard);
return std::move(ResourcesGuard);
}
};

class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCounter<IDataAccessorRequestsSubscriber> {
Expand Down Expand Up @@ -315,6 +327,10 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
}
}
}

TString GetTaskId() const {
return TStringBuilder() << "data-accessor-request-" << RequestId;
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_accessor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/portions
ydb/core/tx/columnshard/data_accessor/abstract
ydb/core/tx/columnshard/data_accessor/local_db
ydb/core/tx/columnshard/resource_subscriber
)

END()
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ std::vector<TCSMetadataRequest> TGranuleActualizationIndex::CollectMetadataReque
if (!TieringActualizer) {
return {};
}
auto req = TieringActualizer->BuildMetadataRequest(PathId, portions, TieringActualizer);
if (!req) {
return {};
}
return { *req };
auto reqs = TieringActualizer->BuildMetadataRequests(PathId, portions, TieringActualizer);
return reqs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ std::optional<TTieringActualizer::TFullActualizationInfo> TTieringActualizer::Bu
return {};
}

void TTieringActualizer::AddPortionImpl(const TPortionInfo& portion, const TInstant now) {
auto info = BuildActualizationInfo(portion, now);
if (!info) {
return;
}
AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), now));
auto address = info->GetAddress();
TFindActualizationInfo findId(std::move(address), info->GetWaitInstant(now));
AFL_VERIFY(PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)).second);
}

void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) {
AFL_VERIFY(PathId == portion.GetPathId());
if (!addContext.GetPortionExclusiveGuarantee()) {
Expand Down Expand Up @@ -107,6 +118,9 @@ void TTieringActualizer::ActualizePortionInfo(const TPortionDataAccessor& access
if (!NewPortionIds.erase(accessor.GetPortionInfo().GetPortionId())) {
return;
}
if (NewPortionIds.empty()) {
NYDBTest::TControllers::GetColumnShardController()->OnTieringMetadataActualized();
}
auto& portion = accessor.GetPortionInfo();
if (Tiering) {
std::shared_ptr<ISnapshotSchema> portionSchema = portion.GetSchema(VersionedIndex);
Expand Down Expand Up @@ -201,15 +215,18 @@ void TTieringActualizer::DoExtractTasks(

void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) {
Tiering = info;
std::optional<ui32> newTieringColumnId;
if (Tiering) {
TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetEvictColumnName());
} else {
TieringColumnId = {};
newTieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetEvictColumnName());
}
TargetCriticalSchema = VersionedIndex.GetLastCriticalSchema();
PortionsInfo.clear();
NewPortionIds.clear();
PortionIdByWaitDuration.clear();
if (newTieringColumnId != TieringColumnId) {
MaxByPortionId.clear();
}
TieringColumnId = newTieringColumnId;

for (auto&& i : externalContext.GetPortions()) {
AddPortion(i.second, externalContext);
Expand Down Expand Up @@ -240,18 +257,32 @@ class TActualizationReply: public IMetadataAccessorResultProcessor {

} // namespace

std::optional<TCSMetadataRequest> TTieringActualizer::BuildMetadataRequest(
std::vector<TCSMetadataRequest> TTieringActualizer::BuildMetadataRequests(
const ui64 /*pathId*/, const THashMap<ui64, TPortionInfo::TPtr>& portions, const std::shared_ptr<TTieringActualizer>& index) {
if (NewPortionIds.empty()) {
return std::nullopt;
NYDBTest::TControllers::GetColumnShardController()->OnTieringMetadataActualized();
return {};
}
std::shared_ptr<TDataAccessorsRequest> result = std::make_shared<TDataAccessorsRequest>();

const ui64 batchMemorySoftLimit = !NYDBTest::TControllers::GetColumnShardController()->GetMetadataRequestSoftMemoryLimit();
std::vector<TCSMetadataRequest> requests;
std::shared_ptr<TDataAccessorsRequest> currentRequest;
for (auto&& i : NewPortionIds) {
if (!currentRequest) {
currentRequest = std::make_shared<TDataAccessorsRequest>();
}
auto it = portions.find(i);
AFL_VERIFY(it != portions.end());
result->AddPortion(it->second);
currentRequest->AddPortion(it->second);
if (currentRequest->PredictAccessorsMemory(it->second->GetSchema(VersionedIndex)) >= batchMemorySoftLimit) {
requests.emplace_back(currentRequest, std::make_shared<TActualizationReply>(index));
currentRequest.reset();
}
}
if (currentRequest) {
requests.emplace_back(std::move(currentRequest), std::make_shared<TActualizationReply>(index));
}
return TCSMetadataRequest(result, std::make_shared<TActualizationReply>(index));
return requests;
}

} // namespace NKikimr::NOlap::NActualizer
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,14 @@ class TTieringActualizer: public IActualizer {

std::optional<TFullActualizationInfo> BuildActualizationInfo(const TPortionInfo& portion, const TInstant now) const;

void AddPortionImpl(const TPortionInfo& portion, const TInstant now) {
auto info = BuildActualizationInfo(portion, now);
if (!info) {
return;
}
AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), now));
auto address = info->GetAddress();
TFindActualizationInfo findId(std::move(address), info->GetWaitInstant(now));
AFL_VERIFY(PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)).second);
}
void AddPortionImpl(const TPortionInfo& portion, const TInstant now);

virtual void DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) override;
virtual void DoRemovePortion(const ui64 portionId) override;
virtual void DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) override;
public:
void ActualizePortionInfo(const TPortionDataAccessor& accessor, const TActualizationContext& context);
std::optional<TCSMetadataRequest> BuildMetadataRequest(
std::vector<TCSMetadataRequest> BuildMetadataRequests(
const ui64 pathId, const THashMap<ui64, TPortionInfo::TPtr>& portions, const std::shared_ptr<TTieringActualizer>& index);

void Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext);
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/hooks/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace NKikimr::NOlap {
class TColumnEngineChanges;
class IBlobsGCAction;
class TPortionInfo;
class TDataAccessorsResult;
namespace NIndexes {
class TIndexMetaContainer;
}
Expand Down Expand Up @@ -106,6 +107,9 @@ class ICSController {
virtual ui64 DoGetRejectMemoryIntervalLimit(const ui64 defaultValue) const {
return defaultValue;
}
virtual ui64 DoGetMetadataRequestSoftMemoryLimit(const ui64 defaultValue) const {
return defaultValue;
}
virtual ui64 DoGetReadSequentiallyBufferSize(const ui64 defaultValue) const {
return defaultValue;
}
Expand Down Expand Up @@ -208,6 +212,10 @@ class ICSController {
const ui64 defaultValue = NOlap::TGlobalLimits::DefaultRejectMemoryIntervalLimit;
return DoGetRejectMemoryIntervalLimit(defaultValue);
}
ui64 GetMetadataRequestSoftMemoryLimit() const {
const ui64 defaultValue = 100 * (1 << 20);
return DoGetMetadataRequestSoftMemoryLimit(defaultValue);
}
virtual bool NeedForceCompactionBacketsConstruction() const {
return false;
}
Expand All @@ -234,6 +242,8 @@ class ICSController {
}
virtual void OnPortionActualization(const NOlap::TPortionInfo& /*info*/) {
}
virtual void OnTieringMetadataActualized() {
}
virtual void OnMaxValueUsage() {
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/hooks/testing/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ class TController: public TReadOnlyController {
virtual ui64 DoGetRejectMemoryIntervalLimit(const ui64 def) const override {
return OverrideRejectMemoryIntervalLimit.value_or(def);
}
virtual ui64 DoGetMetadataRequestSoftMemoryLimit(const ui64 def) const override {
return 0;
}
virtual EOptimizerCompactionWeightControl GetCompactionControl() const override {
return CompactionControl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ void TTestSchema::InitSchema(const std::vector<NArrow::NTest::TTestColumn>& colu

for (ui32 i = 0; i < columns.size(); ++i) {
*schema->MutableColumns()->Add() = columns[i].CreateColumn(i + 1);
if (!specials.NeedTestStatistics()) {
if (!specials.NeedTestStatistics(pk)) {
continue;
}
if (NOlap::NIndexes::NMax::TIndexMeta::IsAvailableType(columns[i].GetType())) {
Expand Down
Loading

0 comments on commit 8a88bab

Please sign in to comment.