Skip to content

Commit

Permalink
Better v1 support
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 committed Feb 19, 2024
1 parent 2fad3ae commit 2dd9c05
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 104 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -974,8 +974,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings type for its dq source node");
}
}

Expand Down
20 changes: 12 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,6 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}

void PassAway() override {
PrintBackTrace();
LOG_D("TS3FileQueueActor", "PassAway");
TBase::PassAway();
}
Expand Down Expand Up @@ -883,8 +882,6 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
}

void Bootstrap() {
LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex);

if (!UseRuntimeListing) {
FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{
TxId,
Expand All @@ -902,6 +899,9 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
PatternVariant,
ES3PatternType::Wildcard});
}

LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex << ", FileQueue: " << FileQueueActor << (UseRuntimeListing ? " (remote)" : " (local"));

FileQueueEvents.Init(TxId, SelfId(), SelfId());
FileQueueEvents.OnNewRecipientId(FileQueueActor);
if (UseRuntimeListing && FileQueueConsumersCountDelta > 0) {
Expand Down Expand Up @@ -1007,7 +1007,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
hFunc(TEvPrivate::TEvReadError, Handle);
hFunc(TEvS3FileQueue::TEvObjectPathBatch, HandleObjectPathBatch);
hFunc(TEvS3FileQueue::TEvObjectPathReadError, HandleObjectPathReadError);
hFunc(TEvS3FileQueue::TEvAck, Handle);
hFunc(TEvS3FileQueue::TEvAck, HandleAck);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
Expand All @@ -1020,6 +1020,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA

void HandleObjectPathBatch(TEvS3FileQueue::TEvObjectPathBatch::TPtr& objectPathBatch) {
if (!FileQueueEvents.OnEventReceived(objectPathBatch)) {
LOG_W("TS3ReadActor", "Duplicated TEvObjectPathBatch (likely resent) from " << FileQueueActor);
return;
}

Expand All @@ -1029,6 +1030,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
ListedFiles += objectBatch.GetObjectPaths().size();
IsFileQueueEmpty = objectBatch.GetNoMoreFiles();
if (IsFileQueueEmpty && !IsConfirmedFileQueueFinish) {
LOG_D("TS3ReadActor", "Confirm finish to " << FileQueueActor);
SendPathBatchRequest();
IsConfirmedFileQueueFinish = true;
}
Expand All @@ -1045,11 +1047,13 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
}
void HandleObjectPathReadError(TEvS3FileQueue::TEvObjectPathReadError::TPtr& result) {
if (!FileQueueEvents.OnEventReceived(result)) {
LOG_W("TS3ReadActor", "Duplicated TEvObjectPathReadError (likely resent) from " << FileQueueActor);
return;
}

IsFileQueueEmpty = true;
if (!IsConfirmedFileQueueFinish) {
LOG_D("TS3ReadActor", "Confirm finish (with errors) to " << FileQueueActor);
SendPathBatchRequest();
IsConfirmedFileQueueFinish = true;
}
Expand All @@ -1060,6 +1064,10 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}

void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) {
FileQueueEvents.OnEventReceived(ev);
}

static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
if (!result.Issues) {
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path)));
Expand Down Expand Up @@ -1172,10 +1180,6 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}

void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) {
FileQueueEvents.OnEventReceived(ev);
}

void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
FileQueueEvents.Retry();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class TS3DqIntegration: public TDqIntegrationBase {
}

auto useRuntimeListing = State_->Configuration->UseRuntimeListing.Get().GetOrElse(false);

YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing;
if (useRuntimeListing) {
size_t partitionCount = hasDirectories ? maxPartitions : Min(parts.size(), maxPartitions);
partitions.reserve(partitionCount);
Expand All @@ -117,6 +119,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
TStringOutput out(partitions.back());
range.Save(&out);
}
YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", partitionCount=" << partitionCount << ", maxPartitions=" << maxPartitions;
return 0;
}

Expand Down Expand Up @@ -158,6 +161,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
range.Save(&out);
}

YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", partitionCount=" << partitions.size() << ", maxPartitions=" << maxPartitions;;
return 0;
}

Expand Down Expand Up @@ -416,6 +420,8 @@ class TS3DqIntegration: public TDqIntegrationBase {
auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000);
srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)});

YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing;

if (useRuntimeListing) {
TPathList paths;
for (auto i = 0u; i < settings.Paths().Size(); ++i) {
Expand Down Expand Up @@ -481,12 +487,13 @@ class TS3DqIntegration: public TDqIntegrationBase {
<< "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second;
}
}

auto consumersCount = hasDirectories ? maxPartitions : paths.size();

auto fileQueuePrefetchSize = State_->Configuration->FileQueuePrefetchSize.Get()
.GetOrElse(consumersCount * srcDesc.GetParallelDownloadCount() * 3);

YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount;

auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register(NDq::CreateS3FileQueueActor(
0ul,
std::move(paths),
Expand Down
Loading

0 comments on commit 2dd9c05

Please sign in to comment.