Skip to content

Commit

Permalink
scan error processing on restore data (#7813) (#7877)
Browse files Browse the repository at this point in the history
Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
  • Loading branch information
zverevgeny and ivanmorozov333 authored Aug 16, 2024
1 parent a2b0e30 commit 34f01ca
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
5 changes: 4 additions & 1 deletion ydb/core/tx/columnshard/data_reader/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
}

void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) {
AFL_VERIFY(false)("error", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
SwitchStage(EStage::WaitData, EStage::Finished);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")(
"reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
}

void TActor::Bootstrap(const TActorContext& /*ctx*/) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/data_reader/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class IRestoreTask {
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
virtual TConclusionStatus DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) = 0;
virtual TConclusionStatus DoOnFinished() = 0;
virtual void DoOnError(const TString& errorMessage) = 0;
virtual std::unique_ptr<TEvColumnShard::TEvInternalScan> DoBuildRequestInitiator() const = 0;

public:
Expand All @@ -24,6 +25,10 @@ class IRestoreTask {
return DoOnFinished();
}

void OnError(const TString& errorMessage) {
DoOnError(errorMessage);
}

std::unique_ptr<TEvColumnShard::TEvInternalScan> BuildRequestInitiator() const {
return DoBuildRequestInitiator();
}
Expand Down
21 changes: 15 additions & 6 deletions ydb/core/tx/columnshard/operations/batch_builder/restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ std::unique_ptr<NKikimr::TEvColumnShard::TEvInternalScan> TModificationRestoreTa
NKikimr::TConclusionStatus TModificationRestoreTask::DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) {
auto result = Merger->AddExistsDataOrdered(data);
if (result.IsFail()) {
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "restore_data_problems")
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "merge_data_problems")
("write_id", WriteData.GetWriteMeta().GetWriteId())("tablet_id", TabletId)("message", result.GetErrorMessage());
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
auto evResult = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(NKikimrProto::EReplyStatus::CORRUPTED,
std::move(buffer), result.GetErrorMessage());
TActorContext::AsActorContext().Send(ParentActorId, evResult.release());
SendErrorMessage(result.GetErrorMessage());
}
return result;
}

void TModificationRestoreTask::DoOnError(const TString& errorMessage) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "restore_data_problems")("write_id", WriteData.GetWriteMeta().GetWriteId())(
"tablet_id", TabletId)("message", errorMessage);
SendErrorMessage(errorMessage);
}

NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() {
{
auto result = Merger->Finish();
Expand Down Expand Up @@ -65,4 +67,11 @@ TModificationRestoreTask::TModificationRestoreTask(const ui64 tabletId, const NA

}

void TModificationRestoreTask::SendErrorMessage(const TString& errorMessage) {
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
auto evResult = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(NKikimrProto::EReplyStatus::CORRUPTED, std::move(buffer), errorMessage);
TActorContext::AsActorContext().Send(ParentActorId, evResult.release());
}

}
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/operations/batch_builder/restore.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask {

virtual TConclusionStatus DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) override;
virtual TConclusionStatus DoOnFinished() override;
virtual void DoOnError(const TString& errorMessage) override;
void SendErrorMessage(const TString& errorMessage);

public:
TModificationRestoreTask(const ui64 tabletId, const NActors::TActorId parentActorId,
const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger,
Expand Down

0 comments on commit 34f01ca

Please sign in to comment.