Skip to content

Commit

Permalink
Handle exceptions in listing callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 committed Feb 21, 2024
1 parent 012caf5 commit be50758
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,21 +354,28 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

EvNextListingChunkReceived = EvBegin,
EvRoundRobinStageTimeout,
EvTransitToErrorState,

EvEnd
};
static_assert(
EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE),
"expected EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE)");

struct TEvNextListingChunkReceived :
public TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> {
struct TEvNextListingChunkReceived : public TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> {
NS3Lister::TListResult ListingResult;
TEvNextListingChunkReceived(NS3Lister::TListResult listingResult)
: ListingResult(std::move(listingResult)){};
};
struct TEvRoundRobinStageTimeout :
public TEventLocal<TEvRoundRobinStageTimeout, EvRoundRobinStageTimeout> {

struct TEvRoundRobinStageTimeout : public TEventLocal<TEvRoundRobinStageTimeout, EvRoundRobinStageTimeout> {
};

struct TEvTransitToErrorState : public TEventLocal<TEvTransitToErrorState, EvTransitToErrorState> {
explicit TEvTransitToErrorState(TIssues&& issues)
: Issues(issues) {
}
TIssues Issues;
};
};
using TBase = TActorBootstrapped<TS3FileQueueActor>;
Expand Down Expand Up @@ -437,6 +444,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatch);
hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
hFunc(TEvPrivatePrivate::TEvTransitToErrorState, HandleTransitToErrorState);
cFunc(TEvents::TSystem::Poison, HandlePoison);
default:
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
Expand Down Expand Up @@ -479,6 +487,11 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}
}

void HandleTransitToErrorState(TEvPrivatePrivate::TEvTransitToErrorState::TPtr& ev) {
MaybeIssues = ev->Get().Issues;
TransitToErrorState();
}

bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) {
LOG_T("TS3FileQueueActor", "SaveRetrievedResults");
if (std::holds_alternative<NS3Lister::TListError>(listingResult)) {
Expand Down Expand Up @@ -717,10 +730,17 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
->Next()
.Subscribe([actorSystem, selfId = SelfId()](
const NThreading::TFuture<NS3Lister::TListResult>& future) {
actorSystem->Send(
selfId,
new TEvPrivatePrivate::TEvNextListingChunkReceived(
future.GetValue()));
try {
actorSystem->Send(
selfId,
new TEvPrivatePrivate::TEvNextListingChunkReceived(
future.GetValue()));
} catch (const std::exception& e) {
actorSystem->Send(
selfId,
new TEvPrivatePrivate::TEvTransitToErrorState(
TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}}));
}
});
}

Expand Down

0 comments on commit be50758

Please sign in to comment.