Skip to content

Commit

Permalink
Dynamic deadline for CS scan (#9520)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Sep 20, 2024
1 parent 631a781 commit 6c407dc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
24 changes: 21 additions & 3 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
, DataFormat(dataFormat)
, TabletId(tabletId)
, ReadMetadataRange(readMetadataRange)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
, Timeout(timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)
, ScanCountersPool(scanCountersPool)
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
, ComputeShardingPolicy(computeShardingPolicy) {
Expand All @@ -72,7 +72,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
);
auto g = Stats->MakeGuard("bootstrap");
ScanActorId = ctx.SelfID;
Schedule(Deadline, new TEvents::TEvWakeup);

Y_ABORT_UNLESS(!ScanIterator);
ResourceSubscribeActorId = ctx.Register(new NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
Expand All @@ -88,6 +87,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
SendScanError("scanner_start_error:" + startResult.GetErrorMessage());
Finish(NColumnShard::TScanCounters::EStatusFinish::ProblemOnStart);
} else {
ScheduleWakeup(GetDeadline());

// propagate self actor id // TODO: FlagSubscribeOnSession ?
Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId), IEventHandle::FlagTrackDelivery);
Expand Down Expand Up @@ -176,7 +176,11 @@ void TColumnShardScan::HandleScan(TEvents::TEvWakeup::TPtr& /*ev*/) {
"Scan " << ScanActorId << " guard execution timeout"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);

Finish(NColumnShard::TScanCounters::EStatusFinish::Deadline);
if (TMonotonic::Now() >= GetDeadline()) {
Finish(NColumnShard::TScanCounters::EStatusFinish::Deadline);
} else {
ScheduleWakeup(GetDeadline());
}
}

bool TColumnShardScan::ProduceResults() noexcept {
Expand Down Expand Up @@ -377,6 +381,7 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {
}
ReadMetadataRange->OnReplyConstruction(TabletId, *Result);
AckReceivedInstant.reset();
LastResultInstant = TMonotonic::Now();

Send(ScanComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ?

Expand Down Expand Up @@ -414,4 +419,17 @@ void TColumnShardScan::ReportStats() {
Bytes = 0;
}

void TColumnShardScan::ScheduleWakeup(const TMonotonic deadline) {
if (deadline != TMonotonic::Max()) {
Schedule(deadline, new TEvents::TEvWakeup);
}
}

TMonotonic TColumnShardScan::GetDeadline() const {
AFL_VERIFY(StartInstant);
if (LastResultInstant) {
return *LastResultInstant + Timeout;
}
return *StartInstant + Timeout;
}
}
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/actor/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo

void ReportStats();

void ScheduleWakeup(const TMonotonic deadline);

TMonotonic GetDeadline() const;

private:
const TActorId ColumnShardActorId;
const TActorId ReadBlobsActorId;
Expand All @@ -122,7 +126,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
std::vector<std::pair<TString, NScheme::TTypeInfo>> KeyYqlSchema;
const TSerializedTableRange TableRange;
const TSmallVec<bool> SkipNullKeys;
const TInstant Deadline;
const TDuration Timeout;
NColumnShard::TConcreteScanCounters ScanCountersPool;

TMaybe<TString> AbortReason;
Expand All @@ -132,6 +136,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
std::shared_ptr<arrow::RecordBatch> CurrentLastReadKey;
i64 InFlightReads = 0;
bool Finished = false;
std::optional<TMonotonic> LastResultInstant;

class TBlobStats {
private:
Expand Down

0 comments on commit 6c407dc

Please sign in to comment.