forked from ydb-platform/ydb
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix race on scan start with indexation cleaning blobs (ydb-platform#7968
- Loading branch information
1 parent
a248aa6
commit 9359f79
Showing
7 changed files
with
149 additions
and
170 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
114 changes: 53 additions & 61 deletions
114
ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,94 +1,86 @@ | ||
#include "tx_internal_scan.h" | ||
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h> | ||
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h> | ||
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h> | ||
|
||
#include <ydb/core/formats/arrow/arrow_batch_builder.h> | ||
#include <ydb/core/sys_view/common/schema.h> | ||
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h> | ||
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h> | ||
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h> | ||
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h> | ||
|
||
namespace NKikimr::NOlap::NReader { | ||
|
||
bool TTxInternalScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) { | ||
TMemoryProfileGuard mpg("TTxInternalScan::Execute"); | ||
void TTxInternalScan::SendError(const TString& problem, const TString& details, const TActorContext& ctx) const { | ||
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("problem", problem)("details", details); | ||
auto& request = *InternalScanEvent->Get(); | ||
const TSnapshot snapshot = request.ReadToSnapshot.value_or(NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId)); | ||
auto scanComputeActor = InternalScanEvent->Sender; | ||
|
||
TReadDescription read(snapshot, request.GetReverse()); | ||
read.PathId = request.GetPathId(); | ||
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId); | ||
std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse())); | ||
read.ColumnIds = request.GetColumnIds(); | ||
read.ColumnNames = request.GetColumnNames(); | ||
if (request.RangesFilter) { | ||
read.PKRangesFilter = std::move(*request.RangesFilter); | ||
} | ||
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID()); | ||
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); | ||
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, | ||
TStringBuilder() << "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << problem << "/" | ||
<< details); | ||
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); | ||
|
||
const TVersionedIndex* vIndex = Self->GetIndexOptional() ? &Self->GetIndexOptional()->GetVersionedIndex() : nullptr; | ||
AFL_VERIFY(vIndex); | ||
{ | ||
TProgramContainer pContainer; | ||
pContainer.OverrideProcessingColumns(read.ColumnNames); | ||
read.SetProgram(std::move(pContainer)); | ||
} | ||
ctx.Send(scanComputeActor, ev.Release()); | ||
} | ||
|
||
{ | ||
auto newRange = scannerConstructor->BuildReadMetadata(Self, read); | ||
if (!newRange) { | ||
ErrorDescription = newRange.GetErrorMessage(); | ||
ReadMetadataRange = nullptr; | ||
return true; | ||
} | ||
ReadMetadataRange = newRange.DetachResult(); | ||
} | ||
AFL_VERIFY(ReadMetadataRange); | ||
bool TTxInternalScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) { | ||
return true; | ||
} | ||
|
||
void TTxInternalScan::Complete(const TActorContext& ctx) { | ||
TMemoryProfileGuard mpg("TTxInternalScan::Complete"); | ||
|
||
auto& request = *InternalScanEvent->Get(); | ||
auto scanComputeActor = InternalScanEvent->Sender; | ||
const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("tablet", Self->TabletID()); | ||
|
||
if (!ReadMetadataRange) { | ||
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", "no metadata")("error", ErrorDescription); | ||
const TSnapshot snapshot = request.ReadToSnapshot.value_or(NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId)); | ||
const NActors::TLogContextGuard gLogging = | ||
NActors::TLogContextBuilder::Build()("tablet", Self->TabletID())("snapshot", snapshot.DebugString()); | ||
TReadMetadataPtr readMetadataRange; | ||
{ | ||
TReadDescription read(snapshot, request.GetReverse()); | ||
read.PathId = request.GetPathId(); | ||
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId); | ||
std::unique_ptr<IScannerConstructor> scannerConstructor( | ||
new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse())); | ||
read.ColumnIds = request.GetColumnIds(); | ||
read.ColumnNames = request.GetColumnNames(); | ||
if (request.RangesFilter) { | ||
read.PKRangesFilter = std::move(*request.RangesFilter); | ||
} | ||
|
||
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID()); | ||
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); | ||
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder() | ||
<< "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "no metadata ranges"); | ||
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); | ||
const TVersionedIndex* vIndex = Self->GetIndexOptional() ? &Self->GetIndexOptional()->GetVersionedIndex() : nullptr; | ||
AFL_VERIFY(vIndex); | ||
{ | ||
TProgramContainer pContainer; | ||
pContainer.OverrideProcessingColumns(read.ColumnNames); | ||
read.SetProgram(std::move(pContainer)); | ||
} | ||
|
||
ctx.Send(scanComputeActor, ev.Release()); | ||
return; | ||
{ | ||
auto newRange = scannerConstructor->BuildReadMetadata(Self, read); | ||
if (!newRange) { | ||
return SendError("cannot create read metadata", newRange.GetErrorMessage(), ctx); | ||
} | ||
readMetadataRange = TValidator::CheckNotNull(newRange.DetachResult()); | ||
} | ||
} | ||
|
||
TStringBuilder detailedInfo; | ||
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { | ||
detailedInfo << " read metadata: (" << *ReadMetadataRange << ")"; | ||
detailedInfo << " read metadata: (" << *readMetadataRange << ")"; | ||
} | ||
|
||
const TVersionedIndex* index = nullptr; | ||
if (Self->HasIndex()) { | ||
index = &Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex(); | ||
} | ||
const TConclusion<ui64> requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRange, index); | ||
if (!requestCookie) { | ||
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", requestCookie.GetErrorMessage())("trace_details", detailedInfo); | ||
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID()); | ||
|
||
ev->Record.SetStatus(Ydb::StatusIds::INTERNAL_ERROR); | ||
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() | ||
<< "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage()); | ||
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); | ||
Self->Counters.GetScanCounters().OnScanFinished(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero()); | ||
ctx.Send(scanComputeActor, ev.Release()); | ||
return; | ||
} | ||
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), | ||
TComputeShardingPolicy(), ScanId, TxId, ScanGen, *requestCookie, Self->TabletID(), TDuration::Max(), ReadMetadataRange, | ||
NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters())); | ||
const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(readMetadataRange, index); | ||
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), TComputeShardingPolicy(), | ||
ScanId, TxId, ScanGen, requestCookie, Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW, | ||
Self->Counters.GetScanCounters())); | ||
|
||
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxInternalScan started")("actor_id", scanActor)("trace_detailed", detailedInfo); | ||
} | ||
|
||
} | ||
} // namespace NKikimr::NOlap::NReader |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.