diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index a7a79a666e89..9137efbfd93f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -5,7 +5,7 @@ namespace NFq::NRowDispatcher::NTests { namespace { -class TFormatHadlerFixture : public TBaseFixture { +class TFormatHandlerFixture : public TBaseFixture { public: using TBase = TBaseFixture; using TCallback = std::function>>&& data)>; @@ -250,7 +250,7 @@ class TFormatHadlerFixture : public TBaseFixture { Y_UNIT_TEST_SUITE(TestFormatHandler) { - Y_UNIT_TEST_F(ManyJsonClients, TFormatHadlerFixture) { + Y_UNIT_TEST_F(ManyJsonClients, TFormatHandlerFixture) { const ui64 firstOffset = 42; const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"}; @@ -278,7 +278,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { }); } - Y_UNIT_TEST_F(ManyRawClients, TFormatHadlerFixture) { + Y_UNIT_TEST_F(ManyRawClients, TFormatHandlerFixture) { CreateFormatHandler( {.JsonParserConfig = {}, .FiltersConfig = {.CompileServiceId = CompileService}}, {.ParsingFormat = "raw"} @@ -320,7 +320,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { }); } - Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) { + Y_UNIT_TEST_F(ClientValidation, TFormatHandlerFixture) { const TVector schema = {{"data", "[DataType; String]"}}; const TString filter = "WHERE FALSE"; const auto callback = EmptyCheck(); @@ -345,7 +345,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { ); } - Y_UNIT_TEST_F(ClientError, TFormatHadlerFixture) { + Y_UNIT_TEST_F(ClientError, TFormatHandlerFixture) { const ui64 firstOffset = 42; const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"}; @@ -365,7 +365,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { ); } - Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) { + Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHandlerFixture) { const ui64 firstOffset = 42; const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"}; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 40d823d4182e..3330016375ad 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -42,6 +42,29 @@ namespace NYql { namespace NDq { +namespace { + +struct TEvPrivate { + enum EEv : ui32 { + EvRuntimeError = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvAsyncOutputError : public NActors::TEventLocal { + TEvAsyncOutputError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) + : StatusCode(statusCode) + , Issues(issues) + {} + + NYql::NDqProto::StatusIds::StatusCode StatusCode; + NYql::TIssues Issues; + }; +}; + +} // anonymous namespace + struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final { OnSinkError(outputIndex, issues, fatalCode); @@ -296,6 +319,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped hFunc(NActors::TEvInterconnect::TEvNodeConnected, HandleExecuteBase); hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived); hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError); + hFunc(TEvPrivate::TEvAsyncOutputError, HandleAsyncOutputError); default: { CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); @@ -1524,7 +1548,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToOneLineString()); - InternalError(fatalCode, issues); + this->Send(this->SelfId(), new TEvPrivate::TEvAsyncOutputError(fatalCode, issues)); } void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final { @@ -1534,7 +1558,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } CA_LOG_E("OutputTransform[" << outputIndex << "] fatal error: " << issues.ToOneLineString()); - InternalError(fatalCode, issues); + this->Send(this->SelfId(), new TEvPrivate::TEvAsyncOutputError(fatalCode, issues)); + } + + void HandleAsyncOutputError(const TEvPrivate::TEvAsyncOutputError::TPtr& ev) { + InternalError(ev->Get()->StatusCode, ev->Get()->Issues); } bool AllAsyncOutputsFinished() const {