Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for dynamic listing (#4525) #4604

Merged
merged 1 commit into from
May 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,14 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
object.SetPath(paths[i].Path);
object.SetPathIndex(paths[i].PathIndex);
if (paths[i].IsDirectory) {
LOG_T("TS3FileQueueActor", "TS3FileQueueActor adding dir: " << paths[i].Path);
object.SetSize(0);
Directories.emplace_back(std::move(object));
} else {
LOG_T("TS3FileQueueActor", "TS3FileQueueActor adding path: " << paths[i].Path << " of size " << paths[i].Size);
object.SetSize(paths[i].Size);
Objects.emplace_back(std::move(object));
ObjectsTotalSize += paths[i].Size;
}
}
}
Expand Down Expand Up @@ -496,9 +499,9 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}

bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) {
LOG_T("TS3FileQueueActor", "SaveRetrievedResults");
if (std::holds_alternative<NS3Lister::TListError>(listingResult)) {
MaybeIssues = std::get<NS3Lister::TListError>(listingResult).Issues;
LOG_E("TS3FileQueueActor", "SaveRetrievedResults error: [" << (MaybeIssues ? MaybeIssues->ToOneLineString() : "") << "]");
return false;
}

Expand All @@ -519,7 +522,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
MaybeIssues = TIssues{TIssue{errorMessage}};
return false;
}
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path << " of size " << object.Size);
TObjectPath objectPath;
objectPath.SetPath(object.Path);
objectPath.SetSize(object.Size);
Expand Down Expand Up @@ -577,7 +580,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
switch (const auto etype = ev->GetTypeRewrite()) {
hFunc(TEvS3FileQueue::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatchForErrorState);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(TEvPrivatePrivate::TEvRoundRobinStageTimeout::EventType, HandleRoundRobinStageTimeout);
cFunc(TEvents::TSystem::Poison, HandlePoison);
default:
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
Expand All @@ -598,17 +601,18 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
if (!UpdatedConsumers.contains(ev->Sender)) {
LOG_D(
"TS3FileQueueActor",
"HandleUpdateConsumersCount Reducing ConsumersCount by " << ev->Get()->Record.GetConsumersCountDelta() << ", recieved from " << ev->Sender);
UpdatedConsumers.insert(ev->Sender);
ConsumersCount -= ev->Get()->Record.GetConsumersCountDelta();
LOG_D(
"TS3FileQueueActor",
"HandleUpdateConsumersCount Reducing ConsumersCount by " << ev->Get()->Record.GetConsumersCountDelta()
<< " to " << ConsumersCount << ", received from " << ev->Sender);
}
Send(ev->Sender, new TEvS3FileQueue::TEvAck(ev->Get()->Record.GetTransportMeta()));
}

void HandleRoundRobinStageTimeout() {
LOG_T("TS3FileQueueActor","Handle start stage timeout");
LOG_D("TS3FileQueueActor","Handle start stage timeout");
if (!RoundRobinStageFinished) {
RoundRobinStageFinished = true;
AnswerPendingRequests();
Expand Down Expand Up @@ -647,7 +651,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
ObjectsTotalSize -= totalSize;
}

LOG_T("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer);
LOG_D("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer << ", " << ObjectsTotalSize << " bytes left");
Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta));

if (HasNoMoreItems()) {
Expand Down
Loading