Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2884 fixed fq cancel operation race #2055

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
}

void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) {
if (CancelOperationIsRunning("StatusTrackerResponse (aborting). ")) {
return;
}

auto& response = *ev->Get();
if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
LOG_I("StatusTrackerResponse (not found). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
CreateFinalizer(Params.Status);
return;
}

Expand All @@ -116,19 +120,23 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
CreateResourcesCleaner();
}
}

void Handle(const TEvYdbCompute::TEvResultWriterResponse::TPtr& ev) {
if (CancelOperationIsRunning("ResultWriterResponse (aborting). ")) {
return;
}

auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
ResignAndPassAway(response.Issues);
return;
}
LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
CreateResourcesCleaner();
}

void Handle(const TEvYdbCompute::TEvResourcesCleanerResponse::TPtr& ev) {
Expand All @@ -139,20 +147,21 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
return;
}
LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release());
CreateFinalizer(IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status);
}

void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) {
// Pinger is no longer available at this place.
// The query can be restarted only after the expiration of lease in case of error
auto& response = *ev->Get();
LOG_I("FinalizerResponse ( " << (response.Status == NYdb::EStatus::SUCCESS ? "success" : "failed") << ") " << response.Status << " Issues: " << response.Issues.ToOneLineString());
LOG_I("FinalizerResponse ( " << (response.Status == NYdb::EStatus::SUCCESS ? "success" : "failed") << " ) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
FinishAndPassAway();
}

void Handle(TEvents::TEvQueryActionResult::TPtr& ev) {
LOG_I("QueryActionResult: " << FederatedQuery::QueryAction_Name(ev->Get()->Action));
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted) {
// Start cancel operation only when StatusTracker or ResultWriter is running
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted && !FinalizationStarted) {
IsAborted = true;
Register(ActorFactory->CreateStopper(SelfId(), Connector, Params.OperationId).release());
}
Expand All @@ -166,7 +175,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
return;
}
LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
CreateResourcesCleaner();
}

void Run() { // recover points
Expand All @@ -185,7 +194,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
CreateFinalizer(Params.Status);
}
break;
case FederatedQuery::QueryMeta::FAILING:
Expand All @@ -194,7 +203,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
CreateFinalizer(Params.Status);
}
break;
default:
Expand All @@ -220,8 +229,28 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
PassAway();
}

void CreateResourcesCleaner() {
FinalizationStarted = true;
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
}

void CreateFinalizer(FederatedQuery::QueryMeta::ComputeStatus status) {
FinalizationStarted = true;
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, status).release());
}

bool CancelOperationIsRunning(const TString& stage) const {
if (!IsAborted) {
return false;
}

LOG_I(stage << "Stop task execution, cancel operation now is running");
return true;
}

private:
bool IsAborted = false;
bool FinalizationStarted = false;
TActorId FetcherId;
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
TRunActorParams Params;
Expand Down
Loading