Skip to content

Commit

Permalink
YQ-4104 fixed fault on sink error (#14457)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 12, 2025
1 parent cdbb0bc commit 2c20d41
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace NFq::NRowDispatcher::NTests {

namespace {

class TFormatHadlerFixture : public TBaseFixture {
class TFormatHandlerFixture : public TBaseFixture {
public:
using TBase = TBaseFixture;
using TCallback = std::function<void(TQueue<std::pair<TRope, TVector<ui64>>>&& data)>;
Expand Down Expand Up @@ -250,7 +250,7 @@ class TFormatHadlerFixture : public TBaseFixture {


Y_UNIT_TEST_SUITE(TestFormatHandler) {
Y_UNIT_TEST_F(ManyJsonClients, TFormatHadlerFixture) {
Y_UNIT_TEST_F(ManyJsonClients, TFormatHandlerFixture) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

Expand Down Expand Up @@ -278,7 +278,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
});
}

Y_UNIT_TEST_F(ManyRawClients, TFormatHadlerFixture) {
Y_UNIT_TEST_F(ManyRawClients, TFormatHandlerFixture) {
CreateFormatHandler(
{.JsonParserConfig = {}, .FiltersConfig = {.CompileServiceId = CompileService}},
{.ParsingFormat = "raw"}
Expand Down Expand Up @@ -320,7 +320,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
});
}

Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) {
Y_UNIT_TEST_F(ClientValidation, TFormatHandlerFixture) {
const TVector<TSchemaColumn> schema = {{"data", "[DataType; String]"}};
const TString filter = "WHERE FALSE";
const auto callback = EmptyCheck();
Expand All @@ -345,7 +345,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
);
}

Y_UNIT_TEST_F(ClientError, TFormatHadlerFixture) {
Y_UNIT_TEST_F(ClientError, TFormatHandlerFixture) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

Expand All @@ -365,7 +365,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
);
}

Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) {
Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHandlerFixture) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

Expand Down
32 changes: 30 additions & 2 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@
namespace NYql {
namespace NDq {

namespace {

struct TEvPrivate {
enum EEv : ui32 {
EvRuntimeError = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");

struct TEvAsyncOutputError : public NActors::TEventLocal<TEvAsyncOutputError, EvRuntimeError> {
TEvAsyncOutputError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues)
: StatusCode(statusCode)
, Issues(issues)
{}

NYql::NDqProto::StatusIds::StatusCode StatusCode;
NYql::TIssues Issues;
};
};

} // anonymous namespace

struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final {
OnSinkError(outputIndex, issues, fatalCode);
Expand Down Expand Up @@ -296,6 +319,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
hFunc(NActors::TEvInterconnect::TEvNodeConnected, HandleExecuteBase);
hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived);
hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError);
hFunc(TEvPrivate::TEvAsyncOutputError, HandleAsyncOutputError);
default: {
CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
Expand Down Expand Up @@ -1524,7 +1548,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToOneLineString());
InternalError(fatalCode, issues);
this->Send(this->SelfId(), new TEvPrivate::TEvAsyncOutputError(fatalCode, issues));
}

void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final {
Expand All @@ -1534,7 +1558,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

CA_LOG_E("OutputTransform[" << outputIndex << "] fatal error: " << issues.ToOneLineString());
InternalError(fatalCode, issues);
this->Send(this->SelfId(), new TEvPrivate::TEvAsyncOutputError(fatalCode, issues));
}

void HandleAsyncOutputError(const TEvPrivate::TEvAsyncOutputError::TPtr& ev) {
InternalError(ev->Get()->StatusCode, ev->Get()->Issues);
}

bool AllAsyncOutputsFinished() const {
Expand Down

0 comments on commit 2c20d41

Please sign in to comment.