Skip to content

Commit

Permalink
Handle unexpected future exception (#11091)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored Oct 30, 2024
1 parent 5d45abf commit 8140e4b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
4 changes: 2 additions & 2 deletions ydb/library/yql/core/facade/yql_facade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ TProgram::TStatus SyncExecution(
(program->*method)(std::forward<Params2>(params)...);
YQL_ENSURE(future.Initialized());
future.Wait();
YQL_ENSURE(!future.HasException());
HandleFutureException(future);

TProgram::TStatus status = future.GetValue();
while (status == TProgram::TStatus::Async) {
auto continueFuture = program->ContinueAsync();
continueFuture.Wait();
YQL_ENSURE(!continueFuture.HasException());
HandleFutureException(continueFuture);
status = continueFuture.GetValue();
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/core/yql_execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,12 @@ class TExecutionTransformer : public TGraphTransformerBase {

if (DeterministicMode) {
future.Subscribe([state](const NThreading::TFuture<void>& future) {
YQL_ENSURE(!future.HasException());
HandleFutureException(future);
ProcessFutureResultQueue(state);
});
} else {
future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
YQL_ENSURE(!future.HasException());
HandleFutureException(future);

TAutoPtr<TState::TItem> item = new TState::TItem;
item->Node = node; item->DataProvider = dataProvider;
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/core/yql_graph_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo

auto future = transformer.GetAsyncFuture(*root);
future.Wait();
YQL_ENSURE(!future.HasException());
HandleFutureException(future);

status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
if (newRoot) {
Expand Down Expand Up @@ -377,7 +377,7 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges);
status.Subscribe(
[asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void {
YQL_ENSURE(!status.HasException());
HandleFutureException(status);
asyncCallback(status.GetValue());
});
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/library/yql/core/yql_graph_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
TExprContext& ctx, bool applyAsyncChanges);

template <typename T>
void HandleFutureException(const NThreading::TFuture<T>& future) {
if (future.HasException()) {
try {
future.TryRethrow();
} catch (...) {
throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage();
}
}
}

class TSyncTransformerBase : public TGraphTransformerBase {
public:
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
settings, progressWriter, UploadCache_->ModulesMapping, fillSettings.Discard, executionTimeout);

future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
YQL_ENSURE(!completedFuture.HasException());
HandleFutureException(completedFuture);
MarkProgressFinished(publicIds->AllPublicIds, completedFuture.GetValueSync().Success(), progressWriter);
});
executionPlanner.Destroy();
Expand Down Expand Up @@ -1856,7 +1856,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
if (filesRes.first.Level == TStatus::Async) {
precomputeFuture = filesRes.second.Apply([execState = ExecPrecomputeState_, node = input.Get(), logCtx](const TAsyncTransformCallbackFuture& future) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx);
YQL_ENSURE(!future.HasException());
HandleFutureException(future);
YQL_CLOG(DEBUG, ProviderDq) << "Finishing freezing files";
CompleteNode(execState, node, future.GetValue());
});
Expand Down Expand Up @@ -1980,7 +1980,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
bool neverFallback = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never;
precomputeFuture = future.Apply([publicIds, state = State, startTime, execState = ExecPrecomputeState_, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx);
YQL_ENSURE(!completedFuture.HasException());
HandleFutureException(completedFuture);
const IDqGateway::TResult& res = completedFuture.GetValueSync();

MarkProgressFinished(publicIds->AllPublicIds, res.Success(), state->ProgressWriter);
Expand Down

0 comments on commit 8140e4b

Please sign in to comment.