diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 239f7d85215a..93e79224271f 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -80,13 +80,13 @@ TProgram::TStatus SyncExecution( (program->*method)(std::forward(params)...); YQL_ENSURE(future.Initialized()); future.Wait(); - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); TProgram::TStatus status = future.GetValue(); while (status == TProgram::TStatus::Async) { auto continueFuture = program->ContinueAsync(); continueFuture.Wait(); - YQL_ENSURE(!continueFuture.HasException()); + HandleFutureException(continueFuture); status = continueFuture.GetValue(); } diff --git a/ydb/library/yql/core/yql_execution.cpp b/ydb/library/yql/core/yql_execution.cpp index c8c4d93ae4ba..342925e342a1 100644 --- a/ydb/library/yql/core/yql_execution.cpp +++ b/ydb/library/yql/core/yql_execution.cpp @@ -525,12 +525,12 @@ class TExecutionTransformer : public TGraphTransformerBase { if (DeterministicMode) { future.Subscribe([state](const NThreading::TFuture& future) { - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); ProcessFutureResultQueue(state); }); } else { future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture& future) { - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); TAutoPtr item = new TState::TItem; item->Node = node; item->DataProvider = dataProvider; diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp index 8d0f6c454342..fddab7c6c6a9 100644 --- a/ydb/library/yql/core/yql_graph_transformer.cpp +++ b/ydb/library/yql/core/yql_graph_transformer.cpp @@ -249,7 +249,7 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo auto future = transformer.GetAsyncFuture(*root); future.Wait(); - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); status = transformer.ApplyAsyncChanges(root, newRoot, ctx); if (newRoot) { @@ -377,7 +377,7 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr NThreading::TFuture status = AsyncTransform(transformer, root, ctx, applyAsyncChanges); status.Subscribe( [asyncCallback](const NThreading::TFuture& status) mutable -> void { - YQL_ENSURE(!status.HasException()); + HandleFutureException(status); asyncCallback(status.GetValue()); }); } diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h index a0bccb0f9e2e..1129abfc8323 100644 --- a/ydb/library/yql/core/yql_graph_transformer.h +++ b/ydb/library/yql/core/yql_graph_transformer.h @@ -236,6 +236,17 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges); +template +void HandleFutureException(const NThreading::TFuture& future) { + if (future.HasException()) { + try { + future.TryRethrow(); + } catch (...) { + throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage(); + } + } +} + class TSyncTransformerBase : public TGraphTransformerBase { public: NThreading::TFuture DoGetAsyncFuture(const TExprNode& input) final { diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 1caaa862d2b1..7ab68f5ab9bf 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1459,7 +1459,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters settings, progressWriter, UploadCache_->ModulesMapping, fillSettings.Discard, executionTimeout); future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture& completedFuture) { - YQL_ENSURE(!completedFuture.HasException()); + HandleFutureException(completedFuture); MarkProgressFinished(publicIds->AllPublicIds, completedFuture.GetValueSync().Success(), progressWriter); }); executionPlanner.Destroy(); @@ -1856,7 +1856,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters if (filesRes.first.Level == TStatus::Async) { precomputeFuture = filesRes.second.Apply([execState = ExecPrecomputeState_, node = input.Get(), logCtx](const TAsyncTransformCallbackFuture& future) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); YQL_CLOG(DEBUG, ProviderDq) << "Finishing freezing files"; CompleteNode(execState, node, future.GetValue()); }); @@ -1980,7 +1980,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters bool neverFallback = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never; precomputeFuture = future.Apply([publicIds, state = State, startTime, execState = ExecPrecomputeState_, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture& completedFuture) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); - YQL_ENSURE(!completedFuture.HasException()); + HandleFutureException(completedFuture); const IDqGateway::TResult& res = completedFuture.GetValueSync(); MarkProgressFinished(publicIds->AllPublicIds, res.Success(), state->ProgressWriter);