From 7103d182bf6bfb9e7ffbc7dc976cb0f852ec6fbd Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 2 Feb 2024 07:27:21 +0000 Subject: [PATCH 1/2] Exception check --- .../ydb_checkpoint_storage.cpp | 22 ++++++++++++++---- .../checkpoint_storage/ydb_state_storage.cpp | 15 +++++++++--- ydb/core/fq/libs/ydb/ut/ya.make | 12 ++++++++++ ydb/core/fq/libs/ydb/ut/ydb_ut.cpp | 23 +++++++++++++++++++ ydb/core/fq/libs/ydb/ya.make | 4 ++++ ydb/core/fq/libs/ydb/ydb.cpp | 8 ++++++- 6 files changed, 75 insertions(+), 9 deletions(-) create mode 100644 ydb/core/fq/libs/ydb/ut/ya.make create mode 100644 ydb/core/fq/libs/ydb/ut/ydb_ut.cpp diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 3f4a228b93f2..61858bccd96e 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -805,10 +805,16 @@ TFuture TCheckpointStorage::CreateC return future.Apply( [checkpointContext](const TFuture& future) { - if (NYql::TIssues issues = StatusToIssues(future.GetValue())) { - return TCreateCheckpointResult(TString(), std::move(issues)); - } else { - return TCreateCheckpointResult(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId, NYql::TIssues()); + try { + if (NYql::TIssues issues = StatusToIssues(future.GetValue())) { + return TCreateCheckpointResult(TString(), std::move(issues)); + } else { + return TCreateCheckpointResult(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId, NYql::TIssues()); + } + } catch (...) { + TIssues issues; + issues.AddIssue(CurrentExceptionMessage()); + return TCreateCheckpointResult(TString(), issues); } }); } @@ -1099,7 +1105,13 @@ TFuture TCheckpointStor }); return future.Apply( [result](const TFuture& status) { - return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues())); + try { + return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues())); + } catch (...) { + TIssues issues; + issues.AddIssue(CurrentExceptionMessage()); + return ICheckpointStorage::TGetTotalCheckpointsStateSizeResult(0, issues); + } }); } diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp index 1fe0e7794944..63086e244854 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp @@ -115,6 +115,7 @@ TFuture ProcessState( processed[taskIndex] = true; } LoadState(context->States[taskIndex], *parser.ColumnParser("blob").GetOptionalString()); + // throw std::runtime_error("ddddd"); } } else { errorMessage << "Not all states exist in database"; @@ -286,6 +287,7 @@ TFuture TStateStorage::GetState( return MakeFuture(result); } + auto context = MakeIntrusive( YdbConnection->TablePathPrefix, taskIds, @@ -366,11 +368,18 @@ TFuture TStateStorage::CountStates( [context] (const TFuture& future) { TCountStatesResult countResult; countResult.first = context->Count; - const auto& status = future.GetValue(); - if (!status.IsSuccess()) { - countResult.second = status.GetIssues(); + try { + const auto& status = future.GetValue(); + if (!status.IsSuccess()) { + countResult.second = status.GetIssues(); + } + } catch (...) { + TIssues issues; + issues.AddIssue(CurrentExceptionMessage()); + countResult.second = issues; } return countResult; + }); } TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() { diff --git a/ydb/core/fq/libs/ydb/ut/ya.make b/ydb/core/fq/libs/ydb/ut/ya.make new file mode 100644 index 000000000000..e96a95760f6c --- /dev/null +++ b/ydb/core/fq/libs/ydb/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(ydb/core/fq/libs/ydb) + +SRCS( + ydb_ut.cpp +) + +PEERDIR( + ydb/core/fq/libs/ydb +) + +END() + diff --git a/ydb/core/fq/libs/ydb/ut/ydb_ut.cpp b/ydb/core/fq/libs/ydb/ut/ydb_ut.cpp new file mode 100644 index 000000000000..a567c93a7635 --- /dev/null +++ b/ydb/core/fq/libs/ydb/ut/ydb_ut.cpp @@ -0,0 +1,23 @@ +#include + +#include + +namespace NFq { + +Y_UNIT_TEST_SUITE(TFqYdbTest) { + + Y_UNIT_TEST(ShouldStatusToIssuesProcessExceptions) + { + auto promise = NThreading::NewPromise(); + auto future = promise.GetFuture(); + TString text("Test exception"); + promise.SetException(text); + NThreading::TFuture future2 = NFq::StatusToIssues(future); + + NYql::TIssues issues = future2.GetValueSync(); + UNIT_ASSERT(issues.Size() == 1); + UNIT_ASSERT(issues.ToString().Contains(text)); + } +}; + +} // namespace NFq diff --git a/ydb/core/fq/libs/ydb/ya.make b/ydb/core/fq/libs/ydb/ya.make index 887219ddac96..59711c1106f1 100644 --- a/ydb/core/fq/libs/ydb/ya.make +++ b/ydb/core/fq/libs/ydb/ya.make @@ -24,3 +24,7 @@ GENERATE_ENUM_SERIALIZATION(ydb.h) YQL_LAST_ABI_VERSION() END() + +RECURSE_FOR_TESTS( + ut +) \ No newline at end of file diff --git a/ydb/core/fq/libs/ydb/ydb.cpp b/ydb/core/fq/libs/ydb/ydb.cpp index ca72290bccc0..bd07edc39bc1 100644 --- a/ydb/core/fq/libs/ydb/ydb.cpp +++ b/ydb/core/fq/libs/ydb/ydb.cpp @@ -210,7 +210,13 @@ NYql::TIssues StatusToIssues(const NYdb::TStatus& status) { TFuture StatusToIssues(const TFuture& future) { return future.Apply( [] (const TFuture& future) { - return StatusToIssues(future.GetValue()); + try { + return StatusToIssues(future.GetValue()); + } catch (...) { + TIssues issues; + issues.AddIssue(CurrentExceptionMessage()); + return issues; + } }); } From a6330363dda34ea1a9982f177902d23c9b551fb3 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 2 Feb 2024 07:29:45 +0000 Subject: [PATCH 2/2] Add new line --- ydb/core/fq/libs/ydb/ya.make | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/ydb/ya.make b/ydb/core/fq/libs/ydb/ya.make index 59711c1106f1..6e392e79c13b 100644 --- a/ydb/core/fq/libs/ydb/ya.make +++ b/ydb/core/fq/libs/ydb/ya.make @@ -27,4 +27,4 @@ END() RECURSE_FOR_TESTS( ut -) \ No newline at end of file +)