Skip to content

Commit

Permalink
Fixed fault on sink error
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Feb 11, 2025
1 parent 8c6a599 commit 19b4233
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@
namespace NYql {
namespace NDq {

namespace {

struct TEvPrivate {
enum EEv : ui32 {
EvRuntimeError = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");

struct TEvAsyncOutputError : public NActors::TEventLocal<TEvAsyncOutputError, EvRuntimeError> {
TEvAsyncOutputError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues)
: StatusCode(statusCode)
, Issues(issues)
{}

NYql::NDqProto::StatusIds::StatusCode StatusCode;
NYql::TIssues Issues;
};
};

} // anonymous namespace

struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final {
OnSinkError(outputIndex, issues, fatalCode);
Expand Down Expand Up @@ -296,6 +319,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
hFunc(NActors::TEvInterconnect::TEvNodeConnected, HandleExecuteBase);
hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived);
hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError);
hFunc(TEvPrivate::TEvAsyncOutputError, HadleAsyncOutputError);
default: {
CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
Expand Down Expand Up @@ -1524,7 +1548,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToOneLineString());
InternalError(fatalCode, issues);
this->Send(this->SelfId(), new TEvPrivate::TEvAsyncOutputError(fatalCode, issues));
}

void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final {
Expand All @@ -1534,7 +1558,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

CA_LOG_E("OutputTransform[" << outputIndex << "] fatal error: " << issues.ToOneLineString());
InternalError(fatalCode, issues);
this->Send(this->SelfId(), new TEvPrivate::TEvAsyncOutputError(fatalCode, issues));
}

void HadleAsyncOutputError(const TEvPrivate::TEvAsyncOutputError::TPtr& ev) {
InternalError(ev->Get()->StatusCode, ev->Get()->Issues);
}

bool AllAsyncOutputsFinished() const {
Expand Down

0 comments on commit 19b4233

Please sign in to comment.