Skip to content

Commit

Permalink
Dynamic listing mass fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 committed Feb 14, 2024
1 parent b1f681e commit b50b536
Show file tree
Hide file tree
Showing 25 changed files with 121 additions and 123 deletions.
7 changes: 3 additions & 4 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1475,12 +1475,11 @@ class TKqpHost : public IKqpHost {
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
state->Configuration->WriteThroughDqIntegration = true;
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
state->MaxTasksPerStage = SessionCtx->ConfigPtr()->MaxTasksPerStage.Get();

state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
state->Gateway = FederatedQuerySetup->HttpGateway;

auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway);
auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway);
auto dataSource = NYql::CreateS3DataSource(state);
auto dataSink = NYql::CreateS3DataSink(state);

TypesCtx->AddDataSource(NYql::S3ProviderName, std::move(dataSource));
TypesCtx->AddDataSink(NYql::S3ProviderName, std::move(dataSink));
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,6 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
auto& externalSource = *protoSource->MutableExternalSource();
google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");

// Partitioning
TVector<TString> partitionParams;
Expand All @@ -975,6 +970,12 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
externalSource.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
}

google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");
}
}

Expand Down
10 changes: 10 additions & 0 deletions ydb/library/yql/dq/actors/compute/retry_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
}
}

bool TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) {
Connected = false;
ScheduleRetry();
return true;
}

return false;
}

void TRetryEventsQueue::Retry() {
RetryScheduled = false;
if (!Connected) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/actors/compute/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class TRetryEventsQueue {
void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true);
void HandleNodeConnected(ui32 nodeId);
void HandleNodeDisconnected(ui32 nodeId);
bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Retry();
void Unsubscribe();

Expand Down Expand Up @@ -165,7 +166,7 @@ class TRetryEventsQueue {
THolder<T> ev = MakeHolder<T>();
ev->Record = Event->Record;
ev->Record.MutableTransportMeta()->SetConfirmedSeqNo(confirmedSeqNo);
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), 0, Cookie);
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), NActors::IEventHandle::FlagTrackDelivery, Cookie);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/integration/yql_dq_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class IDqIntegration {
virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0;
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
virtual bool CanFallback() = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions) = 0;
virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return 0ULL;
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
const TDqSource source(&node);
if (const auto maySettings = source.Settings().Maybe<TClSourceSettings>()) {
const auto settings = maySettings.Cast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ bool TDqIntegrationBase::CanFallback() {
return false;
}

void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t) {
}

void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TDqIntegrationBase: public IDqIntegration {
bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override;
TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
bool CanFallback() override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t) override;
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override;
void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ namespace NYql::NDqs {
TString sourceType;
if (dqSource) {
sourceSettings.ConstructInPlace();
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType);
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType, maxPartitions);
YQL_ENSURE(!sourceSettings->type_url().empty(), "Data source provider \"" << dataSourceName << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceName << "\" did't fill dq source settings type for its dq source node");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
.Ptr();
::google::protobuf::Any settings;
TString sourceType;
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType);
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1);
UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
UNIT_ASSERT(settings.Is<Generic::TSource>());
settings.UnpackTo(DqSourceSettings_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace NYql {
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings,
TString& sourceType) override {
TString& sourceType, size_t) override {
const TDqSource source(&node);
if (const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>()) {
const auto settings = maybeSettings.Cast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
}
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
if (auto maybeDqSource = TMaybeNode<TDqSource>(&node)) {
auto settings = maybeDqSource.Cast().Settings();
if (auto maybeTopicSource = TMaybeNode<TDqPqTopicSource>(settings.Raw())) {
Expand Down
88 changes: 63 additions & 25 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,15 @@ struct TEvS3FileQueue {
struct TEvUpdateConsumersCount :
public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {

TEvUpdateConsumersCount() {
Record.SetConsumersCountDelta(0);
}

explicit TEvUpdateConsumersCount(ui64 consumersCountDelta) {
explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
Record.SetConsumersCountDelta(consumersCountDelta);
}
};

struct TEvAck :
public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {

TEvAck() {}
TEvAck() = default;

explicit TEvAck(const TMessageTransportMeta& transportMeta) {
Record.MutableTransportMeta()->CopyFrom(transportMeta);
Expand Down Expand Up @@ -205,7 +201,7 @@ struct TEvS3FileQueue {
struct TEvObjectPathReadError :
public NActors::TEventPB<TEvObjectPathReadError, NS3::FileQueue::TEvObjectPathReadError, EvObjectPathReadError> {

TEvObjectPathReadError() {}
TEvObjectPathReadError() = default;

TEvObjectPathReadError(TIssues issues, const TMessageTransportMeta& transportMeta) {
IssuesToMessage(issues, Record.MutableIssues());
Expand Down Expand Up @@ -441,7 +437,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatch);
hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(TEvents::TSystem::Poison, PassAway);
cFunc(TEvents::TSystem::Poison, HandlePoison);
default:
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
TransitToErrorState();
Expand Down Expand Up @@ -541,7 +537,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
hFunc(TEvS3FileQueue::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatchForEmptyState);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(TEvents::TSystem::Poison, PassAway);
cFunc(TEvents::TSystem::Poison, HandlePoison);
default:
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
TransitToErrorState();
Expand All @@ -566,7 +562,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
hFunc(TEvS3FileQueue::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatchForErrorState);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(TEvents::TSystem::Poison, PassAway);
cFunc(TEvents::TSystem::Poison, HandlePoison);
default:
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
break;
Expand Down Expand Up @@ -603,12 +599,14 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}
}

void HandlePoison() {
AnswerPendingRequests();
PassAway();
}

void PassAway() override {
PrintBackTrace();
LOG_D("TS3FileQueueActor", "PassAway");

AnswerPendingRequests();
Objects.clear();
Directories.clear();
TBase::PassAway();
}

Expand Down Expand Up @@ -1004,7 +1002,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
return FilesRemained && (*FilesRemained == 0);
}

STRICT_STFUNC(StateFunc,
STRICT_STFUNC_EXC(StateFunc,
hFunc(TEvPrivate::TEvReadResult, Handle);
hFunc(TEvPrivate::TEvReadError, Handle);
hFunc(TEvS3FileQueue::TEvObjectPathBatch, HandleObjectPathBatch);
Expand All @@ -1013,11 +1011,19 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
, catch (const std::exception& e) {
TIssues issues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
}
)

void HandleObjectPathBatch(TEvS3FileQueue::TEvObjectPathBatch::TPtr& objectPathBatch) {
if (!FileQueueEvents.OnEventReceived(objectPathBatch)) {
return;
}

Y_ENSURE(IsWaitingFileQueueResponse);
FileQueueEvents.OnEventReceived(objectPathBatch);
IsWaitingFileQueueResponse = false;
auto& objectBatch = objectPathBatch->Get()->Record;
ListedFiles += objectBatch.GetObjectPaths().size();
Expand All @@ -1038,7 +1044,10 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
}
}
void HandleObjectPathReadError(TEvS3FileQueue::TEvObjectPathReadError::TPtr& result) {
FileQueueEvents.OnEventReceived(result);
if (!FileQueueEvents.OnEventReceived(result)) {
return;
}

IsFileQueueEmpty = true;
if (!IsConfirmedFileQueueFinish) {
SendPathBatchRequest();
Expand Down Expand Up @@ -1177,10 +1186,18 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
}

void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev) {
LOG_T("TS3ReadActor","Handle connected FileQueue " << ev->Get()->NodeId);
LOG_T("TS3ReadActor", "Handle connected FileQueue " << ev->Get()->NodeId);
FileQueueEvents.HandleNodeConnected(ev->Get()->NodeId);
}

void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_T("TS3ReadActor", "Handle undelivered FileQueue ");
if (!FileQueueEvents.HandleUndelivered(ev)) {
TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}};
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
}
}

// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
LOG_D("TS3ReadActor", "PassAway");
Expand Down Expand Up @@ -2117,13 +2134,17 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED");
}

STRICT_STFUNC(StateFunc,
STRICT_STFUNC_EXC(StateFunc,
hFunc(TEvPrivate::TEvReadStarted, Handle);
hFunc(TEvPrivate::TEvDataPart, Handle);
hFunc(TEvPrivate::TEvReadFinished, Handle);
hFunc(TEvPrivate::TEvContinue, Handle);
hFunc(TEvPrivate::TEvReadResult2, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
, catch (const std::exception& e) {
TIssues issues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
}
)

void ProcessOneEvent() {
Expand Down Expand Up @@ -2836,7 +2857,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}

STRICT_STFUNC(StateFunc,
STRICT_STFUNC_EXC(StateFunc,
hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
hFunc(TEvPrivate::TEvNextRecordBatch, HandleNextRecordBatch);
Expand All @@ -2847,11 +2868,19 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
, catch (const std::exception& e) {
TIssues issues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
}
)

void HandleObjectPathBatch(TEvS3FileQueue::TEvObjectPathBatch::TPtr& objectPathBatch) {
if (!FileQueueEvents.OnEventReceived(objectPathBatch)) {
return;
}

Y_ENSURE(IsWaitingFileQueueResponse);
FileQueueEvents.OnEventReceived(objectPathBatch);
IsWaitingFileQueueResponse = false;
auto& objectBatch = objectPathBatch->Get()->Record;
ListedFiles += objectBatch.GetObjectPaths().size();
Expand All @@ -2877,7 +2906,10 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
}

void HandleObjectPathReadError(TEvS3FileQueue::TEvObjectPathReadError::TPtr& result) {
FileQueueEvents.OnEventReceived(result);
if (!FileQueueEvents.OnEventReceived(result)) {
return;
}

IsFileQueueEmpty = true;
if (!IsConfirmedFileQueueFinish) {
LOG_T("TS3StreamReadActor", "Sending finish confirmation to FileQueue");
Expand Down Expand Up @@ -2981,8 +3013,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
FileQueueEvents.OnEventReceived(ev);
}

void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
Y_UNUSED(ev);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
FileQueueEvents.Retry();
}

Expand All @@ -2992,10 +3023,17 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
}

void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev) {
LOG_T("TS3StreamReadActor","Handle connected FileQueue " << ev->Get()->NodeId);
LOG_T("TS3StreamReadActor", "Handle connected FileQueue " << ev->Get()->NodeId);
FileQueueEvents.HandleNodeConnected(ev->Get()->NodeId);
}

void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_T("TS3StreamReadActor", "Handle undelivered FileQueue ");
if (!FileQueueEvents.HandleUndelivered(ev)) {
TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}};
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
}
}
bool LastFileWasProcessed() const {
return Blocks.empty() && (ListedFiles == CompletedFiles) && IsFileQueueEmpty;
}
Expand Down
Loading

0 comments on commit b50b536

Please sign in to comment.