Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Co-authored-by: Timur Sufiyanov <fa-luke16@mail.ru>
Co-authored-by: Oleg Doronin <dorooleg@yandex.ru>
  • Loading branch information
3 people authored May 30, 2024
1 parent d58be63 commit aaf8d3f
Show file tree
Hide file tree
Showing 29 changed files with 538 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
for (const auto& connection: task.Internal.connection()) {
const auto serviceAccountId = ExtractServiceAccountId(connection);
if (!serviceAccountId) {
continue;
continue;
}
auto* account = newTask->add_service_accounts();
account->set_value(serviceAccountId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
version: '3.4'
services:
clickhouse:
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
container_name: fq-tests-ch-clickhouse
environment:
CLICKHOUSE_DB: db
CLICKHOUSE_USER: user
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: password
CLICKHOUSE_USER: user
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
ports:
- 9000
- 8123
- 9000
- 8123
fq-connector-go:
container_name: fq-tests-ch-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.5@sha256:7f086ce3869b84a59fd76a10a9de8125c0d382915e956d34832105e03829a61b
volumes:
- ../../fq-connector-go/:/opt/ydb/cfg/
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6
ports:
- 2130
- 2130
volumes:
- ../../fq-connector-go/:/opt/ydb/cfg/
version: "3.4"
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
version: '3.4'
services:
fq-connector-go:
container_name: fq-tests-pg-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6
ports:
- 2130
volumes:
- ../../fq-connector-go/:/opt/ydb/cfg/
postgresql:
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
command:
- postgres
- -c
- log_statement=all
- -c
- log_connections=on
- -c
- log_disconnections=on
container_name: fq-tests-pg-postgresql
environment:
POSTGRES_DB: db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
command: ["postgres", "-c", "log_statement=all", "-c", "log_connections=on", "-c", "log_disconnections=on"]
ports:
- 5432
fq-connector-go:
container_name: fq-tests-pg-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.5@sha256:7f086ce3869b84a59fd76a10a9de8125c0d382915e956d34832105e03829a61b
volumes:
- ../../fq-connector-go/:/opt/ydb/cfg/
POSTGRES_USER: user
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
ports:
- 2130
- 5432
version: "3.4"
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
version: '3.4'
services:
ydb:
image: cr.yandex/yc/yandex-docker-local-ydb:23.3.17@sha256:bf9001c849cc6c4c9b56f32f5440a6e8390c4e841937c9f9caf929fd70a689c8
container_name: fq-tests-ydb-ydb
hostname: fq-tests-ydb-ydb
environment:
YDB_DEFAULT_LOG_LEVEL: INFO
POSTGRES_USER: user
POSTGRES_PASSWORD: password
volumes:
- ./init/init_ydb:/init_ydb
- ./init/01_basic.sh:/01_basic.sh

fq-connector-go:
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.12@sha256:dd2483ba061e25e8ee645bcc64cae8b8a0a93dba6772eb4b8ab0a0aab4b8dd48
container_name: fq-tests-ydb-fq-connector-go
volumes:
- ../../fq-connector-go/:/opt/ydb/cfg/
ports:
- 2130
command: >
command: |
sh -c "
echo \"$$(dig fq-tests-ydb-ydb +short) fq-tests-ydb-ydb\" >> /etc/hosts; cat /etc/hosts;
/opt/ydb/bin/fq-connector-go server -c /opt/ydb/cfg/fq-connector-go.yaml"
container_name: fq-tests-ydb-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6
ports:
- 2130
volumes:
- ../../fq-connector-go/:/opt/ydb/cfg/
ydb:
container_name: fq-tests-ydb-ydb
environment:
POSTGRES_PASSWORD: password
POSTGRES_USER: user
YDB_DEFAULT_LOG_LEVEL: DEBUG
hostname: fq-tests-ydb-ydb
image: ghcr.io/ydb-platform/local-ydb:latest@sha256:9045e00afec1923dc3277564c7b2f829087c2115f45f18e1d38b80bb89f98be6
volumes:
- ./init/init_ydb:/init_ydb
- ./init/01_basic.sh:/01_basic.sh
version: "3.4"
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
version: '3.4'
services:
clickhouse:
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
container_name: fq-tests-join-clickhouse
environment:
CLICKHOUSE_DB: db
CLICKHOUSE_USER: user
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: password
CLICKHOUSE_USER: user
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
ports:
- 9000
- 8123
- 9000
- 8123
fq-connector-go:
container_name: fq-tests-join-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6
ports:
- 2130
volumes:
- ../fq-connector-go/:/opt/ydb/cfg/
postgresql:
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
command:
- postgres
- -c
- log_statement=all
- -c
- log_connections=on
- -c
- log_disconnections=on
container_name: fq-tests-join-postgresql
environment:
POSTGRES_DB: db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
command: ["postgres", "-c", "log_statement=all", "-c", "log_connections=on", "-c", "log_disconnections=on"]
ports:
- 5432
fq-connector-go:
container_name: fq-tests-join-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.2.5@sha256:7f086ce3869b84a59fd76a10a9de8125c0d382915e956d34832105e03829a61b
volumes:
- ../fq-connector-go/:/opt/ydb/cfg/
POSTGRES_USER: user
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
ports:
- 2130
- 5432
version: "3.4"
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def get_cluster_name(self, data_source_kind: EDataSourceKind) -> str:
return self.clickhouse.cluster_name
case EDataSourceKind.POSTGRESQL:
return self.postgresql.cluster_name
case EDataSourceKind.YDB:
return self.ydb.cluster_name
case _:
raise Exception(f'invalid data source: {EDataSourceKind.Name(data_source_kind)}')

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/generic/provider/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ PEERDIR(
ydb/library/yql/providers/generic/proto
ydb/library/yql/providers/generic/connector/api/common
ydb/library/yql/providers/generic/connector/libcpp
ydb/library/yql/providers/result/expr_nodes
ydb/library/yql/utils/plan
ydb/public/sdk/cpp/client/ydb_types/credentials
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,118 @@
#include "yql_generic_provider_impl.h"
#include "yql_generic_state.h"

#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
#include <ydb/library/yql/providers/common/transform/yql_exec.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
#include <ydb/library/yql/utils/log/log.h>

namespace NYql {

using namespace NNodes;

class TGenericDataSinkExecTransformer: public TExecTransformerBase {
public:
TGenericDataSinkExecTransformer(TGenericState::TPtr state)
: State_(state)
{
AddHandler({TCoCommit::CallableName()}, RequireFirst(), Pass());
}

private:
TGenericState::TPtr State_;
};
namespace {

class TGenericDataSinkExecTransformer: public TExecTransformerBase {
public:
TGenericDataSinkExecTransformer(TGenericState::TPtr state)
: State_(state)
{
AddHandler({TCoCommit::CallableName()}, RequireFirst(), Hndl(&TGenericDataSinkExecTransformer::HandleCommit));
}

private:
TStatusCallbackPair HandleCommit(const TExprNode::TPtr& input, TExprContext& ctx) {
if (TDqQuery::Match(input->Child(TCoCommit::idx_World))) {
return DelegateExecutionToDqProvider(input->ChildPtr(TCoCommit::idx_World), input, ctx);
} else { // Pass
input->SetState(TExprNode::EState::ExecutionComplete);
input->SetResult(ctx.NewWorld(input->Pos()));
return SyncOk();
}
}

TStatusCallbackPair DelegateExecutionToDqProvider(const TExprNode::TPtr& input, const TExprNode::TPtr& originInput, TExprContext& ctx) {
YQL_CLOG(INFO, ProviderGeneric) << "Delegate execution of " << input->Content() << " to DQ provider.";
auto delegatedNode = Build<TPull>(ctx, input->Pos())
.Input(input)
.BytesLimit()
.Value(TString())
.Build()
.RowsLimit()
.Value(TString("0"))
.Build()
.FormatDetails()
.Value(ToString((ui32)NYson::EYsonFormat::Binary))
.Build()
.Settings()
.Build()
.Format()
.Value(ToString("0"))
.Build()
.PublicId()
.Value("id")
.Build()
.Discard()
.Value(ToString(true))
.Build()
.Origin(originInput)
.Done()
.Ptr();

for (auto idx : {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails,
TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard}) {
delegatedNode->Child(idx)->SetTypeAnn(ctx.MakeType<TUnitExprType>());
delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete);
}

delegatedNode->SetTypeAnn(originInput->GetTypeAnn());
delegatedNode->SetState(TExprNode::EState::ConstrComplete);
originInput->SetState(TExprNode::EState::ExecutionInProgress);

const auto dqProvider = State_->Types->DataSourceMap.FindPtr(DqProviderName);

TExprNode::TPtr delegatedNodeOutput;
if (const auto status = dqProvider->Get()->GetCallableExecutionTransformer().Transform(delegatedNode, delegatedNodeOutput, ctx); status.Level != TStatus::Async) {
YQL_ENSURE(status.Level != TStatus::Ok, "Asynchronous execution is expected in a happy path.");
return SyncStatus(status);
}

auto dqFuture = dqProvider->Get()->GetCallableExecutionTransformer().GetAsyncFuture(*delegatedNode);

TAsyncTransformCallbackFuture callbackFuture = dqFuture.Apply(
[dqProvider, delegatedNode](const NThreading::TFuture<void>& completedFuture) {
return TAsyncTransformCallback(
[completedFuture, dqProvider, delegatedNode](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
completedFuture.GetValue();
output = input;
TExprNode::TPtr delegatedNodeOutput;
auto dqWriteStatus = dqProvider->Get()->GetCallableExecutionTransformer().ApplyAsyncChanges(delegatedNode, delegatedNodeOutput, ctx);

YQL_ENSURE(dqWriteStatus != TStatus::Async, "ApplyAsyncChanges should not return Async.");

if (dqWriteStatus == TStatus::Repeat)
output->SetState(TExprNode::EState::ExecutionRequired);

if (dqWriteStatus != TStatus::Ok)
return dqWriteStatus;

output->SetState(TExprNode::EState::ExecutionComplete);
output->SetResult(ctx.NewAtom(input->Pos(), "DQ_completed"));
return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok);
});
});

return std::make_pair(IGraphTransformer::TStatus::Async, callbackFuture);
}

const TGenericState::TPtr State_;
};

}

THolder<TExecTransformerBase> CreateGenericDataSinkExecTransformer(TGenericState::TPtr state) {
return THolder(new TGenericDataSinkExecTransformer(state));
Expand Down
11 changes: 9 additions & 2 deletions ydb/tests/fq/generic/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
from ydb.tests.tools.fq_runner.kikimr_utils import TokenAccessorExtension
from ydb.tests.tools.fq_runner.kikimr_utils import MDBExtension
from ydb.tests.tools.fq_runner.kikimr_utils import YdbMvpExtension
from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr

from utils.settings import Settings
from ydb.tests.fq.generic.utils.settings import Settings


@pytest.fixture
Expand All @@ -17,11 +18,17 @@ def settings() -> Settings:


@pytest.fixture
def kikimr(request: pytest.FixtureRequest, settings: Settings, yq_version: str):
def mvp_external_ydb_endpoint(request) -> str:
return request.param["endpoint"] if request is not None and hasattr(request, "param") else None


@pytest.fixture
def kikimr(request: pytest.FixtureRequest, settings: Settings, yq_version: str, mvp_external_ydb_endpoint: str):
kikimr_extensions = [
ConnectorExtension(settings.connector.grpc_host, settings.connector.grpc_port, False),
TokenAccessorExtension(settings.token_accessor_mock.endpoint, settings.token_accessor_mock.hmac_secret_file),
MDBExtension(settings.mdb_mock.endpoint),
YdbMvpExtension(mvp_external_ydb_endpoint),
YQv2Extension(yq_version),
]
with start_kikimr(request, kikimr_extensions) as kikimr:
Expand Down
Loading

0 comments on commit aaf8d3f

Please sign in to comment.