diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index 44127db0165a..6f8babcdfbaa 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -455,7 +455,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ for (const auto& connection: task.Internal.connection()) { const auto serviceAccountId = ExtractServiceAccountId(connection); if (!serviceAccountId) { - continue; + continue; } auto* account = newTask->add_service_accounts(); account->set_value(serviceAccountId); diff --git a/ydb/library/yql/providers/generic/connector/tests/datasource/clickhouse/docker-compose.yml b/ydb/library/yql/providers/generic/connector/tests/datasource/clickhouse/docker-compose.yml index 16f0eb3ee86b..d55f3ec04c5d 100644 --- a/ydb/library/yql/providers/generic/connector/tests/datasource/clickhouse/docker-compose.yml +++ b/ydb/library/yql/providers/generic/connector/tests/datasource/clickhouse/docker-compose.yml @@ -1,20 +1,20 @@ -version: '3.4' services: clickhouse: - image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06 container_name: fq-tests-ch-clickhouse environment: CLICKHOUSE_DB: db - CLICKHOUSE_USER: user CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 CLICKHOUSE_PASSWORD: password + CLICKHOUSE_USER: user + image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06 ports: - - 9000 - - 8123 + - 9000 + - 8123 fq-connector-go: container_name: fq-tests-ch-fq-connector-go - image: ghcr.io/ydb-platform/fq-connector-go:v0.2.5@sha256:7f086ce3869b84a59fd76a10a9de8125c0d382915e956d34832105e03829a61b - volumes: - - ../../fq-connector-go/:/opt/ydb/cfg/ + image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6 ports: - - 2130 + - 2130 + volumes: + - ../../fq-connector-go/:/opt/ydb/cfg/ +version: "3.4" diff --git a/ydb/library/yql/providers/generic/connector/tests/datasource/postgresql/docker-compose.yml b/ydb/library/yql/providers/generic/connector/tests/datasource/postgresql/docker-compose.yml index 5808d9191470..1af0821daa9d 100644 --- a/ydb/library/yql/providers/generic/connector/tests/datasource/postgresql/docker-compose.yml +++ b/ydb/library/yql/providers/generic/connector/tests/datasource/postgresql/docker-compose.yml @@ -1,19 +1,26 @@ -version: '3.4' services: + fq-connector-go: + container_name: fq-tests-pg-fq-connector-go + image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6 + ports: + - 2130 + volumes: + - ../../fq-connector-go/:/opt/ydb/cfg/ postgresql: - image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085 + command: + - postgres + - -c + - log_statement=all + - -c + - log_connections=on + - -c + - log_disconnections=on container_name: fq-tests-pg-postgresql environment: POSTGRES_DB: db - POSTGRES_USER: user POSTGRES_PASSWORD: password - command: ["postgres", "-c", "log_statement=all", "-c", "log_connections=on", "-c", "log_disconnections=on"] - ports: - - 5432 - fq-connector-go: - container_name: fq-tests-pg-fq-connector-go - image: ghcr.io/ydb-platform/fq-connector-go:v0.2.5@sha256:7f086ce3869b84a59fd76a10a9de8125c0d382915e956d34832105e03829a61b - volumes: - - ../../fq-connector-go/:/opt/ydb/cfg/ + POSTGRES_USER: user + image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085 ports: - - 2130 + - 5432 +version: "3.4" diff --git a/ydb/library/yql/providers/generic/connector/tests/datasource/ydb/docker-compose.yml b/ydb/library/yql/providers/generic/connector/tests/datasource/ydb/docker-compose.yml index 93a54ce2ffc8..0bffb9bc8337 100644 --- a/ydb/library/yql/providers/generic/connector/tests/datasource/ydb/docker-compose.yml +++ b/ydb/library/yql/providers/generic/connector/tests/datasource/ydb/docker-compose.yml @@ -1,25 +1,24 @@ -version: '3.4' services: - ydb: - image: cr.yandex/yc/yandex-docker-local-ydb:23.3.17@sha256:bf9001c849cc6c4c9b56f32f5440a6e8390c4e841937c9f9caf929fd70a689c8 - container_name: fq-tests-ydb-ydb - hostname: fq-tests-ydb-ydb - environment: - YDB_DEFAULT_LOG_LEVEL: INFO - POSTGRES_USER: user - POSTGRES_PASSWORD: password - volumes: - - ./init/init_ydb:/init_ydb - - ./init/01_basic.sh:/01_basic.sh - fq-connector-go: - image: ghcr.io/ydb-platform/fq-connector-go:v0.2.12@sha256:dd2483ba061e25e8ee645bcc64cae8b8a0a93dba6772eb4b8ab0a0aab4b8dd48 - container_name: fq-tests-ydb-fq-connector-go - volumes: - - ../../fq-connector-go/:/opt/ydb/cfg/ - ports: - - 2130 - command: > + command: | sh -c " echo \"$$(dig fq-tests-ydb-ydb +short) fq-tests-ydb-ydb\" >> /etc/hosts; cat /etc/hosts; /opt/ydb/bin/fq-connector-go server -c /opt/ydb/cfg/fq-connector-go.yaml" + container_name: fq-tests-ydb-fq-connector-go + image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6 + ports: + - 2130 + volumes: + - ../../fq-connector-go/:/opt/ydb/cfg/ + ydb: + container_name: fq-tests-ydb-ydb + environment: + POSTGRES_PASSWORD: password + POSTGRES_USER: user + YDB_DEFAULT_LOG_LEVEL: DEBUG + hostname: fq-tests-ydb-ydb + image: ghcr.io/ydb-platform/local-ydb:latest@sha256:9045e00afec1923dc3277564c7b2f829087c2115f45f18e1d38b80bb89f98be6 + volumes: + - ./init/init_ydb:/init_ydb + - ./init/01_basic.sh:/01_basic.sh +version: "3.4" diff --git a/ydb/library/yql/providers/generic/connector/tests/join/docker-compose.yml b/ydb/library/yql/providers/generic/connector/tests/join/docker-compose.yml index 609c4a4942e6..f0a72d49c59b 100644 --- a/ydb/library/yql/providers/generic/connector/tests/join/docker-compose.yml +++ b/ydb/library/yql/providers/generic/connector/tests/join/docker-compose.yml @@ -1,30 +1,37 @@ -version: '3.4' services: clickhouse: - image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06 container_name: fq-tests-join-clickhouse environment: CLICKHOUSE_DB: db - CLICKHOUSE_USER: user CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 CLICKHOUSE_PASSWORD: password + CLICKHOUSE_USER: user + image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06 ports: - - 9000 - - 8123 + - 9000 + - 8123 + fq-connector-go: + container_name: fq-tests-join-fq-connector-go + image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6 + ports: + - 2130 + volumes: + - ../fq-connector-go/:/opt/ydb/cfg/ postgresql: - image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085 + command: + - postgres + - -c + - log_statement=all + - -c + - log_connections=on + - -c + - log_disconnections=on container_name: fq-tests-join-postgresql environment: POSTGRES_DB: db - POSTGRES_USER: user POSTGRES_PASSWORD: password - command: ["postgres", "-c", "log_statement=all", "-c", "log_connections=on", "-c", "log_disconnections=on"] - ports: - - 5432 - fq-connector-go: - container_name: fq-tests-join-fq-connector-go - image: ghcr.io/ydb-platform/fq-connector-go:v0.2.5@sha256:7f086ce3869b84a59fd76a10a9de8125c0d382915e956d34832105e03829a61b - volumes: - - ../fq-connector-go/:/opt/ydb/cfg/ + POSTGRES_USER: user + image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085 ports: - - 2130 + - 5432 +version: "3.4" diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py index 51971998665d..e30c045f2610 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py +++ b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py @@ -129,6 +129,8 @@ def get_cluster_name(self, data_source_kind: EDataSourceKind) -> str: return self.clickhouse.cluster_name case EDataSourceKind.POSTGRESQL: return self.postgresql.cluster_name + case EDataSourceKind.YDB: + return self.ydb.cluster_name case _: raise Exception(f'invalid data source: {EDataSourceKind.Name(data_source_kind)}') diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index 55dd70b153e2..6c7a5ca000d3 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -55,6 +55,7 @@ PEERDIR( ydb/library/yql/providers/generic/proto ydb/library/yql/providers/generic/connector/api/common ydb/library/yql/providers/generic/connector/libcpp + ydb/library/yql/providers/result/expr_nodes ydb/library/yql/utils/plan ydb/public/sdk/cpp/client/ydb_types/credentials ) diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasink_execution.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasink_execution.cpp index 08d8c43d9e6a..f352752397ce 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasink_execution.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasink_execution.cpp @@ -1,27 +1,118 @@ -#include "yql_generic_provider_impl.h" +#include "yql_generic_state.h" #include -#include +#include #include #include +#include +#include #include +#include #include namespace NYql { using namespace NNodes; - class TGenericDataSinkExecTransformer: public TExecTransformerBase { - public: - TGenericDataSinkExecTransformer(TGenericState::TPtr state) - : State_(state) - { - AddHandler({TCoCommit::CallableName()}, RequireFirst(), Pass()); - } - - private: - TGenericState::TPtr State_; - }; + namespace { + + class TGenericDataSinkExecTransformer: public TExecTransformerBase { + public: + TGenericDataSinkExecTransformer(TGenericState::TPtr state) + : State_(state) + { + AddHandler({TCoCommit::CallableName()}, RequireFirst(), Hndl(&TGenericDataSinkExecTransformer::HandleCommit)); + } + + private: + TStatusCallbackPair HandleCommit(const TExprNode::TPtr& input, TExprContext& ctx) { + if (TDqQuery::Match(input->Child(TCoCommit::idx_World))) { + return DelegateExecutionToDqProvider(input->ChildPtr(TCoCommit::idx_World), input, ctx); + } else { // Pass + input->SetState(TExprNode::EState::ExecutionComplete); + input->SetResult(ctx.NewWorld(input->Pos())); + return SyncOk(); + } + } + + TStatusCallbackPair DelegateExecutionToDqProvider(const TExprNode::TPtr& input, const TExprNode::TPtr& originInput, TExprContext& ctx) { + YQL_CLOG(INFO, ProviderGeneric) << "Delegate execution of " << input->Content() << " to DQ provider."; + auto delegatedNode = Build(ctx, input->Pos()) + .Input(input) + .BytesLimit() + .Value(TString()) + .Build() + .RowsLimit() + .Value(TString("0")) + .Build() + .FormatDetails() + .Value(ToString((ui32)NYson::EYsonFormat::Binary)) + .Build() + .Settings() + .Build() + .Format() + .Value(ToString("0")) + .Build() + .PublicId() + .Value("id") + .Build() + .Discard() + .Value(ToString(true)) + .Build() + .Origin(originInput) + .Done() + .Ptr(); + + for (auto idx : {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails, + TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard}) { + delegatedNode->Child(idx)->SetTypeAnn(ctx.MakeType()); + delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete); + } + + delegatedNode->SetTypeAnn(originInput->GetTypeAnn()); + delegatedNode->SetState(TExprNode::EState::ConstrComplete); + originInput->SetState(TExprNode::EState::ExecutionInProgress); + + const auto dqProvider = State_->Types->DataSourceMap.FindPtr(DqProviderName); + + TExprNode::TPtr delegatedNodeOutput; + if (const auto status = dqProvider->Get()->GetCallableExecutionTransformer().Transform(delegatedNode, delegatedNodeOutput, ctx); status.Level != TStatus::Async) { + YQL_ENSURE(status.Level != TStatus::Ok, "Asynchronous execution is expected in a happy path."); + return SyncStatus(status); + } + + auto dqFuture = dqProvider->Get()->GetCallableExecutionTransformer().GetAsyncFuture(*delegatedNode); + + TAsyncTransformCallbackFuture callbackFuture = dqFuture.Apply( + [dqProvider, delegatedNode](const NThreading::TFuture& completedFuture) { + return TAsyncTransformCallback( + [completedFuture, dqProvider, delegatedNode](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + completedFuture.GetValue(); + output = input; + TExprNode::TPtr delegatedNodeOutput; + auto dqWriteStatus = dqProvider->Get()->GetCallableExecutionTransformer().ApplyAsyncChanges(delegatedNode, delegatedNodeOutput, ctx); + + YQL_ENSURE(dqWriteStatus != TStatus::Async, "ApplyAsyncChanges should not return Async."); + + if (dqWriteStatus == TStatus::Repeat) + output->SetState(TExprNode::EState::ExecutionRequired); + + if (dqWriteStatus != TStatus::Ok) + return dqWriteStatus; + + output->SetState(TExprNode::EState::ExecutionComplete); + output->SetResult(ctx.NewAtom(input->Pos(), "DQ_completed")); + return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok); + }); + }); + + return std::make_pair(IGraphTransformer::TStatus::Async, callbackFuture); + } + + const TGenericState::TPtr State_; + }; + + } THolder CreateGenericDataSinkExecTransformer(TGenericState::TPtr state) { return THolder(new TGenericDataSinkExecTransformer(state)); diff --git a/ydb/tests/fq/generic/conftest.py b/ydb/tests/fq/generic/conftest.py index e21444a3efc2..492a6c6ebd29 100644 --- a/ydb/tests/fq/generic/conftest.py +++ b/ydb/tests/fq/generic/conftest.py @@ -6,9 +6,10 @@ from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension from ydb.tests.tools.fq_runner.kikimr_utils import TokenAccessorExtension from ydb.tests.tools.fq_runner.kikimr_utils import MDBExtension +from ydb.tests.tools.fq_runner.kikimr_utils import YdbMvpExtension from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr -from utils.settings import Settings +from ydb.tests.fq.generic.utils.settings import Settings @pytest.fixture @@ -17,11 +18,17 @@ def settings() -> Settings: @pytest.fixture -def kikimr(request: pytest.FixtureRequest, settings: Settings, yq_version: str): +def mvp_external_ydb_endpoint(request) -> str: + return request.param["endpoint"] if request is not None and hasattr(request, "param") else None + + +@pytest.fixture +def kikimr(request: pytest.FixtureRequest, settings: Settings, yq_version: str, mvp_external_ydb_endpoint: str): kikimr_extensions = [ ConnectorExtension(settings.connector.grpc_host, settings.connector.grpc_port, False), TokenAccessorExtension(settings.token_accessor_mock.endpoint, settings.token_accessor_mock.hmac_secret_file), MDBExtension(settings.mdb_mock.endpoint), + YdbMvpExtension(mvp_external_ydb_endpoint), YQv2Extension(yq_version), ] with start_kikimr(request, kikimr_extensions) as kikimr: diff --git a/ydb/tests/fq/generic/docker-compose.yml b/ydb/tests/fq/generic/docker-compose.yml index 5bfdd752ce71..327bbbff9dcd 100644 --- a/ydb/tests/fq/generic/docker-compose.yml +++ b/ydb/tests/fq/generic/docker-compose.yml @@ -1,28 +1,42 @@ -version: '3.4' services: - postgresql: - image: "postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085" - # to be able to run tests by different users on the same machine we set prefix to ${USER} - container_name: ${USER}_ydb_tests_fq_generic_postgresql - environment: - POSTGRES_DB: db - POSTGRES_USER: user - POSTGRES_PASSWORD: password - volumes: - - ./postgresql:/docker-entrypoint-initdb.d - command: -p 6432 clickhouse: - image: "clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06" - container_name: ${USER}_ydb_tests_fq_generic_clickhouse + container_name: tests-fq-generic-clickhouse environment: CLICKHOUSE_DB: db - CLICKHOUSE_USER: user CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 CLICKHOUSE_PASSWORD: password + CLICKHOUSE_USER: user + image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06 volumes: - - ./clickhouse:/docker-entrypoint-initdb.d - connector: - image: "ghcr.io/ydb-platform/fq-connector-go:v0.1.1-rc.2@sha256:e5c2d86bce9cb43420eed0ed534afe760fb90ad41229dbbf34af28023b219af3" - container_name: ${USER}_ydb_tests_fq_generic_connector + - ./clickhouse:/docker-entrypoint-initdb.d + fq-connector-go: + command: | + sh -c " + echo \"$$(dig tests-fq-generic-ydb +short) tests-fq-generic-ydb\" >> /etc/hosts; cat /etc/hosts; + /opt/ydb/bin/fq-connector-go server -c /opt/ydb/cfg/fq-connector-go.yaml" + container_name: tests-fq-generic-fq-connector-go + image: ghcr.io/ydb-platform/fq-connector-go:v0.2.20@sha256:a1771f348dc8be6219865e332f788429907cdfec3677b3e98f0bc6f7dd542dc6 ports: - - '50051' + - "2130" + postgresql: + command: -p 6432 + container_name: tests-fq-generic-postgresql + environment: + POSTGRES_DB: db + POSTGRES_PASSWORD: password + POSTGRES_USER: user + image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085 + volumes: + - ./postgresql:/docker-entrypoint-initdb.d + ydb: + container_name: tests-fq-generic-ydb + environment: + POSTGRES_PASSWORD: password + POSTGRES_USER: user + YDB_DEFAULT_LOG_LEVEL: DEBUG + hostname: tests-fq-generic-ydb + image: ghcr.io/ydb-platform/local-ydb:latest@sha256:9045e00afec1923dc3277564c7b2f829087c2115f45f18e1d38b80bb89f98be6 + volumes: + - ./ydb/init_ydb:/init_ydb + - ./ydb/01_basic.sh:/01_basic.sh +version: "3.4" diff --git a/ydb/tests/fq/generic/test_clickhouse.py b/ydb/tests/fq/generic/test_clickhouse.py index f3362894a056..08914b49dec7 100644 --- a/ydb/tests/fq/generic/test_clickhouse.py +++ b/ydb/tests/fq/generic/test_clickhouse.py @@ -6,7 +6,7 @@ from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient -from utils.settings import Settings +from ydb.tests.fq.generic.utils.settings import Settings class TestClickHouse: diff --git a/ydb/tests/fq/generic/test_join.py b/ydb/tests/fq/generic/test_join.py index 183ca08234ce..5100d83b7205 100644 --- a/ydb/tests/fq/generic/test_join.py +++ b/ydb/tests/fq/generic/test_join.py @@ -2,19 +2,22 @@ import pytest import ydb.public.api.protos.draft.fq_pb2 as fq -from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient -from utils.settings import Settings +from ydb.tests.fq.generic.utils.settings import Settings class TestJoin: - @yq_v2 + @yq_all + @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True) @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) - def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): + @pytest.mark.parametrize("query_type", [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING]) + def test_simple(self, fq_client: FederatedQueryClient, settings: Settings, query_type): table_name = 'join_table' ch_conn_name = f'ch_conn_{table_name}' pg_conn_name = f'pg_conn_{table_name}' + ydb_conn_name = f'ydb_conn_{table_name}' query_name = f'query_{table_name}' fq_client.create_postgresql_connection( @@ -33,26 +36,37 @@ def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): password=settings.clickhouse.password, ) + fq_client.create_ydb_connection( + name=ydb_conn_name, + database_id=settings.ydb.dbname, + ) + sql = fR''' - SELECT pg.data AS data_pg, ch.data AS data_ch + SELECT pg.data AS data_pg, ch.data AS data_ch, ydb.data AS data_ydb FROM {pg_conn_name}.{table_name} AS pg JOIN {ch_conn_name}.{table_name} AS ch - ON pg.id = ch.id; + ON pg.id = ch.id + JOIN {ydb_conn_name}.{table_name} AS ydb + ON pg.id = ydb.id; ''' - query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + query_id = fq_client.create_query(query_name, sql, type=query_type).result.query_id fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = fq_client.get_result_data(query_id) result_set = data.result.result_set logging.debug(str(result_set)) - assert len(result_set.columns) == 2 + assert len(result_set.columns) == 3 assert result_set.columns[0].name == "data_pg" assert result_set.columns[1].name == "data_ch" + assert result_set.columns[2].name == "data_ydb" assert len(result_set.rows) == 3 assert result_set.rows[0].items[0].bytes_value == b'pg10' assert result_set.rows[0].items[1].bytes_value == b'ch10' + assert result_set.rows[0].items[2].bytes_value == b'ydb10', result_set assert result_set.rows[1].items[0].bytes_value == b'pg20' assert result_set.rows[1].items[1].bytes_value == b'ch20' + assert result_set.rows[1].items[2].bytes_value == b'ydb20' assert result_set.rows[2].items[0].bytes_value == b'pg30' assert result_set.rows[2].items[1].bytes_value == b'ch30' + assert result_set.rows[2].items[2].bytes_value == b'ydb30' diff --git a/ydb/tests/fq/generic/test_postgresql.py b/ydb/tests/fq/generic/test_postgresql.py index 59bcfe5b96fd..631abfbb3fe9 100644 --- a/ydb/tests/fq/generic/test_postgresql.py +++ b/ydb/tests/fq/generic/test_postgresql.py @@ -6,7 +6,7 @@ from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient -from utils.settings import Settings +from ydb.tests.fq.generic.utils.settings import Settings class TestPostgreSQL: diff --git a/ydb/tests/fq/generic/test_streaming_join.py b/ydb/tests/fq/generic/test_streaming_join.py new file mode 100644 index 000000000000..cf9250def41f --- /dev/null +++ b/ydb/tests/fq/generic/test_streaming_join.py @@ -0,0 +1,61 @@ +import pytest +import os + +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.fq.generic.utils.settings import Settings + + +class TestStreamingJoin(TestYdsBase): + @yq_v1 + @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True) + @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) + def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Settings, yq_version): + self.init_topics(f"pq_yq_streaming_test_simple{yq_version}") + fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + table_name = 'join_table' + ydb_conn_name = f'ydb_conn_{table_name}' + + fq_client.create_ydb_connection( + name=ydb_conn_name, + database_id=settings.ydb.dbname, + ) + + sql = R''' + $input = SELECT * FROM myyds.`{input_topic}`; + + $enriched = select e.Data as Data + from + $input as e + left join + ydb_conn_{table_name}.{table_name} as u + on(e.Data = CAST(u.id as String)) + ; + + insert into myyds.`{output_topic}` + select * from $enriched; + '''.format( + input_topic=self.input_topic, output_topic=self.output_topic, table_name=table_name + ) + + query_id = fq_client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + messages = ['A', 'B', 'C'] + self.write_stream(messages) + + read_data = self.read_stream(len(messages)) + assert read_data == messages + + fq_client.abort_query(query_id) + fq_client.wait_query(query_id) + + describe_response = fq_client.describe_query(query_id) + status = describe_response.result.query.meta.status + assert not describe_response.issues, str(describe_response.issues) + assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status) diff --git a/ydb/tests/fq/generic/test_ydb.py b/ydb/tests/fq/generic/test_ydb.py new file mode 100644 index 000000000000..2df1ed04e7a7 --- /dev/null +++ b/ydb/tests/fq/generic/test_ydb.py @@ -0,0 +1,45 @@ +import logging +import pytest + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.fq.generic.utils.settings import Settings + + +class TestYdb: + @yq_v2 + @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True) + def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): + table_name = 'simple_table' + conn_name = f'conn_{table_name}' + query_name = f'query_{table_name}' + + fq_client.create_ydb_connection( + name=conn_name, + database_id=settings.ydb.dbname, + ) + + sql = fR''' + SELECT * + FROM {conn_name}.{table_name}; + ''' + + query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = fq_client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "number" + assert result_set.columns[0].type == ydb.Type( + optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.INT32)) + ) + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int32_value == 1 + assert result_set.rows[1].items[0].int32_value == 2 + assert result_set.rows[2].items[0].int32_value == 3 diff --git a/ydb/tests/tools/docker_compose_helpers/endpoint_determiner.py b/ydb/tests/fq/generic/utils/endpoint_determiner.py similarity index 100% rename from ydb/tests/tools/docker_compose_helpers/endpoint_determiner.py rename to ydb/tests/fq/generic/utils/endpoint_determiner.py diff --git a/ydb/tests/fq/generic/utils/settings.py b/ydb/tests/fq/generic/utils/settings.py index c822b07672fa..e231b219bfb8 100644 --- a/ydb/tests/fq/generic/utils/settings.py +++ b/ydb/tests/fq/generic/utils/settings.py @@ -4,7 +4,7 @@ import yatest.common -from ydb.tests.tools.docker_compose_helpers.endpoint_determiner import EndpointDeterminer +from ydb.tests.fq.generic.utils.endpoint_determiner import EndpointDeterminer @dataclass @@ -46,6 +46,14 @@ class PostgreSQL: postgresql: PostgreSQL + @dataclass + class Ydb: + dbname: str + username: str + password: str + + ydb: Ydb + @classmethod def from_env(cls) -> 'Settings': docker_compose_file = yatest.common.source_path('ydb/tests/fq/generic/docker-compose.yml') @@ -54,7 +62,7 @@ def from_env(cls) -> 'Settings': s = cls( connector=cls.Connector( grpc_host='localhost', - grpc_port=endpoint_determiner.get_port('connector', 50051), + grpc_port=endpoint_determiner.get_port('fq-connector-go', 2130), ), mdb_mock=cls.MdbMock( endpoint=environ['MDB_MOCK_ENDPOINT'], @@ -74,6 +82,7 @@ def from_env(cls) -> 'Settings': username='user', password='password', ), + ydb=cls.Ydb(dbname='local', username='user', password='password'), ) return s diff --git a/ydb/tests/fq/generic/utils/ya.make b/ydb/tests/fq/generic/utils/ya.make index 62d01a35a38a..5e3283aeb5c6 100644 --- a/ydb/tests/fq/generic/utils/ya.make +++ b/ydb/tests/fq/generic/utils/ya.make @@ -3,7 +3,12 @@ PY3_LIBRARY() STYLE_PYTHON() PY_SRCS( + endpoint_determiner.py settings.py ) +PEERDIR( + library/python/testing/yatest_common +) + END() diff --git a/ydb/tests/fq/generic/ya.make b/ydb/tests/fq/generic/ya.make index 3756dc05b184..96cb861a4d50 100644 --- a/ydb/tests/fq/generic/ya.make +++ b/ydb/tests/fq/generic/ya.make @@ -5,41 +5,60 @@ PY3TEST() STYLE_PYTHON() NO_CHECK_IMPORTS() -TAG( - ya:external - ya:force_sandbox - ya:fat -) - -REQUIREMENTS( - container:4467981730 - cpu:all - dns:dns64 -) - +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/mdb_mock/recipe.inc) INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/token_accessor_mock/recipe.inc) -INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) -INCLUDE(${ARCADIA_ROOT}/library/recipes/docker_compose/recipe.inc) -# Including of docker_compose/recipe.inc automatically converts these tests into LARGE, -# which makes it impossible to run them during precommit checks on Github CI. -# Next several lines forces these tests to be MEDIUM. To see discussion, visit YDBOPS-8928. +IF (AUTOCHECK) + # Temporarily disable these tests due to infrastructure incompatibility + SKIP_TEST("DEVTOOLSUPPORT-44637") + + # Split tests to chunks only when they're running on different machines with distbuild, + # otherwise this directive will slow down local test execution. + # Look through DEVTOOLSSUPPORT-39642 for more information. + FORK_SUBTESTS() + + # TAG and REQUIREMENTS are copied from: https://docs.yandex-team.ru/devtools/test/environment#docker-compose + TAG( + ya:external + ya:force_sandbox + ya:fat + ) + + REQUIREMENTS( + cpu:all + container:4467981730 + dns:dns64 + ) +ENDIF() + +INCLUDE(${ARCADIA_ROOT}/library/recipes/docker_compose/recipe.inc) IF (OPENSOURCE) + # Including of docker_compose/recipe.inc automatically converts these tests into LARGE, + # which makes it impossible to run them during precommit checks on Github CI. + # Next several lines forces these tests to be MEDIUM. To see discussion, visit YDBOPS-8928. SIZE(MEDIUM) SET(TEST_TAGS_VALUE) SET(TEST_REQUIREMENTS_VALUE) + + # This requirement forces tests to be launched consequently, + # otherwise CI system would be overloaded due to simultaneous launch of many Docker containers. + # See DEVTOOLSSUPPORT-44103, YA-1759 for details. + TAG(ya:not_autocheck) + REQUIREMENTS(cpu:all) ENDIF() +DEPENDS(ydb/tests/tools/pq_read) + PEERDIR( ydb/tests/fq/generic/utils + ydb/tests/tools/datastreams_helpers library/python/testing/recipe library/python/testing/yatest_common library/recipes/common ydb/tests/tools/fq_runner - ydb/tests/tools/docker_compose_helpers ydb/public/api/protos contrib/python/pytest @@ -50,6 +69,8 @@ TEST_SRCS( test_clickhouse.py test_join.py test_postgresql.py + test_streaming_join.py + test_ydb.py ) END() diff --git a/ydb/tests/fq/generic/ydb/01_basic.sh b/ydb/tests/fq/generic/ydb/01_basic.sh new file mode 100755 index 000000000000..2c94ade1c561 --- /dev/null +++ b/ydb/tests/fq/generic/ydb/01_basic.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +set -ex + +/ydb -p ${PROFILE} yql -s ' + CREATE TABLE simple_table (number Int32, PRIMARY KEY (number)); + COMMIT; + INSERT INTO simple_table (number) VALUES + (1), + (2), + (3); + COMMIT; + + CREATE TABLE join_table (id Int32, data STRING, PRIMARY KEY (id)); + COMMIT; + INSERT INTO join_table (id, data) VALUES + (1, "ydb10"), + (2, "ydb20"), + (3, "ydb30"); + COMMIT; + ' + +retVal=$? +if [ $retVal -ne 0 ]; then + echo $retVal + exit $retVal +fi + +echo $(date +"%T.%6N") "SUCCESS" diff --git a/ydb/tests/fq/generic/ydb/init_ydb b/ydb/tests/fq/generic/ydb/init_ydb new file mode 100755 index 000000000000..d2a8cbdddfd3 --- /dev/null +++ b/ydb/tests/fq/generic/ydb/init_ydb @@ -0,0 +1,12 @@ +#!/bin/bash + +set -ex + +export PROFILE=tests-ydb-client-$(LC_ALL=C tr -dc A-Za-z0-9 str: + return request.param["endpoint"] if request is not None and hasattr(request, 'param') else None + + @pytest.fixture(scope="module") def s3(request) -> S3: port_manager = yatest.common.network.PortManager() @@ -66,6 +72,41 @@ def is_s3_ready(): def stats_mode(): return '' +@pytest.fixture(scope="module") +def kikimr_settings(request: pytest.FixtureRequest): + return getattr(request, "param", dict()) + + +@pytest.fixture(scope="module") +def kikimr_params(request: pytest.FixtureRequest): + return request + + +def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external_ydb_endpoint): + return [AddInflightExtension(), + AddDataInflightExtension(), + AddFormatSizeLimitExtension(), + DefaultConfigExtension(s3.s3_url), + YQv2Extension(yq_version, kikimr_settings.get("is_replace_if_exists", False)), + ComputeExtension(), + YdbMvpExtension(mvp_external_ydb_endpoint), + StatsModeExtension(kikimr_settings.get("stats_mode", "")), + BindingsModeExtension(kikimr_settings.get("bindings_mode", ""), yq_version)] + + +@pytest.fixture(scope="module") +def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint): + kikimr_extensions = get_kikimr_extensions(s3, YQV1_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint) + with start_kikimr(kikimr_params, kikimr_extensions) as kikimr: + yield kikimr + + +@pytest.fixture(scope="module") +def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint): + kikimr_extensions = get_kikimr_extensions(s3, YQV2_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint) + with start_kikimr(kikimr_params, kikimr_extensions) as kikimr: + yield kikimr + @pytest.fixture def bindings_mode(): diff --git a/ydb/tests/fq/s3/test_s3.py b/ydb/tests/fq/s3/test_s3.py index 21fa0382a91e..5e3b564a04ac 100644 --- a/ydb/tests/fq/s3/test_s3.py +++ b/ydb/tests/fq/s3/test_s3.py @@ -3,6 +3,7 @@ import boto3 import logging +import os import pytest import time import ydb.public.api.protos.draft.fq_pb2 as fq @@ -287,7 +288,8 @@ def test_bad_format(self, kikimr, s3, client, runtime_listing, yq_version): @yq_v1 @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client): + @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True) + def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client, unique_prefix): # Prepare S3 resource = boto3.resource( "s3", @@ -450,7 +452,7 @@ def test_write_result(self, kikimr, s3, client, yq_version): kikimr.compute_plane.wait_bootstrap() client.create_storage_connection("fruitbucket", "wbucket") - time.sleep(10) # 2 x node info update period + time.sleep(10) # 2 x node info update period sql = f''' SELECT Fruit, sum(Price) as Price, sum(Weight) as Weight diff --git a/ydb/tests/tools/docker_compose_helpers/ya.make b/ydb/tests/tools/docker_compose_helpers/ya.make deleted file mode 100644 index 73db15dbcdc9..000000000000 --- a/ydb/tests/tools/docker_compose_helpers/ya.make +++ /dev/null @@ -1,11 +0,0 @@ -PY3_LIBRARY() - -PY_SRCS( - endpoint_determiner.py -) - -PEERDIR( - library/python/testing/yatest_common -) - -END() diff --git a/ydb/tests/tools/fq_runner/fq_client.py b/ydb/tests/tools/fq_runner/fq_client.py index 0a435033a11b..6f63420b60d7 100644 --- a/ydb/tests/tools/fq_runner/fq_client.py +++ b/ydb/tests/tools/fq_runner/fq_client.py @@ -380,13 +380,13 @@ def create_connection(self, request, check_issues=True): return FederatedQueryClient.Response(response.operation.issues, result, check_issues) @retry.retry_intrusive - def create_ydb_connection(self, name, database, endpoint, visibility=fq.Acl.Visibility.PRIVATE, - auth_method=AuthMethod.no_auth(), check_issues=True): + def create_ydb_connection(self, name, database_id, + secure=False, visibility=fq.Acl.Visibility.PRIVATE, auth_method=AuthMethod.service_account('sa'), check_issues=True): request = fq.CreateConnectionRequest() request.content.name = name ydb = request.content.setting.ydb_database - ydb.database = database - ydb.endpoint = endpoint + ydb.database_id = database_id + ydb.secure = secure ydb.auth.CopyFrom(auth_method) request.content.acl.visibility = visibility diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 5ba9fff095bc..083be536d00c 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -465,12 +465,9 @@ def fill_config(self, control_plane): if self.compute_services: # yq services fq_config['pinger']['ping_period'] = "5s" # == "10s" / 2 - if self.control_services: - fq_config['private_api']['loopback'] = True - else: - fq_config['private_api']['task_service_endpoint'] = "localhost:" + str( - control_plane.port_allocator.get_node_port_allocator(1).grpc_port) - fq_config['private_api']['task_service_database'] = control_plane.tenant_name + fq_config['private_api']['task_service_endpoint'] = "localhost:" + str( + control_plane.port_allocator.get_node_port_allocator(1).grpc_port) + fq_config['private_api']['task_service_database'] = control_plane.tenant_name if len(self.config_generator.dc_mapping) > 0: fq_config['nodes_manager']['use_data_center'] = True fq_config['enable_task_counters'] = True @@ -526,7 +523,8 @@ def __init__(self, node_count=1, # Union[int, dict[str, TenantConfig]] tenant_mapping=None, # dict[str, str] cloud_mapping=None, # dict - dc_mapping=None # dict + dc_mapping=None, # dict + mvp_external_ydb_endpoint=None # str ): if tenant_mapping is None: tenant_mapping = {} @@ -539,6 +537,7 @@ def __init__(self, self.tenant_mapping = tenant_mapping self.cloud_mapping = cloud_mapping self.dc_mapping = dc_mapping + self.mvp_external_ydb_endpoint = mvp_external_ydb_endpoint class StreamingOverKikimr(object): @@ -549,7 +548,7 @@ def __init__(self, configuration = StreamingOverKikimrConfig() self.uuid = str(uuid.uuid4()) self.mvp_mock_port = PortManager().get_port() - self.mvp_mock_server = Process(target=MvpMockServer(self.mvp_mock_port).serve_forever) + self.mvp_mock_server = Process(target=MvpMockServer(self.mvp_mock_port, configuration.mvp_external_ydb_endpoint).serve_forever) self.tenants = {} _tenant_mapping = configuration.tenant_mapping.copy() if isinstance(configuration.node_count, dict): diff --git a/ydb/tests/tools/fq_runner/kikimr_utils.py b/ydb/tests/tools/fq_runner/kikimr_utils.py index b60a10f74024..3d79fdec15d2 100644 --- a/ydb/tests/tools/fq_runner/kikimr_utils.py +++ b/ydb/tests/tools/fq_runner/kikimr_utils.py @@ -264,6 +264,7 @@ def apply_to_kikimr(self, request, kikimr): kikimr.control_plane.fq_config['common']['disable_ssl_for_generic_data_sources'] = True kikimr.control_plane.fq_config['control_plane_storage']['available_connection'].append('POSTGRESQL_CLUSTER') kikimr.control_plane.fq_config['control_plane_storage']['available_connection'].append('CLICKHOUSE_CLUSTER') + kikimr.control_plane.fq_config['control_plane_storage']['available_connection'].append('YDB_DATABASE') generic = { 'connector': { @@ -276,6 +277,7 @@ def apply_to_kikimr(self, request, kikimr): } kikimr.compute_plane.fq_config['gateways']['generic'] = generic # v1 + kikimr.control_plane.fq_config['gateways']['generic'] = generic # v1 kikimr.compute_plane.qs_config['generic'] = generic # v2 @@ -298,9 +300,28 @@ def apply_to_kikimr(self, request, kikimr): kikimr.compute_plane.qs_config['generic']['mdb_gateway'] = self.endpoint kikimr.compute_plane.fq_config['common']['mdb_transform_host'] = False - kikimr.compute_plane.fq_config['common']['mdb_gateway'] = self.endpoint # v2 - kikimr.compute_plane.fq_config['gateways']['generic']['mdb_gateway'] = self.endpoint # v1 + kikimr.compute_plane.fq_config['common']['mdb_gateway'] = self.endpoint + kikimr.compute_plane.fq_config['gateways']['generic']['mdb_gateway'] = self.endpoint + kikimr.control_plane.fq_config['common']['mdb_transform_host'] = False + kikimr.control_plane.fq_config['common']['mdb_gateway'] = self.endpoint + kikimr.control_plane.fq_config['gateways']['generic']['mdb_gateway'] = self.endpoint + +class YdbMvpExtension(ExtensionPoint): + + def __init__(self, mvp_external_ydb_endpoint): + self.mvp_external_ydb_endpoint = mvp_external_ydb_endpoint + super().__init__() + + def is_applicable(self, request): + return True + + def apply_to_kikimr_conf(self, request, configuration): + configuration.mvp_external_ydb_endpoint = self.mvp_external_ydb_endpoint + + def apply_to_kikimr(self, request, kikimr): + if 'generic' in kikimr.compute_plane.qs_config: + kikimr.compute_plane.qs_config['generic']['ydb_mvp_endpoint'] = kikimr.control_plane.fq_config['common']['ydb_mvp_cloud_endpoint'] class TokenAccessorExtension(ExtensionPoint): @@ -329,6 +350,10 @@ def apply_to_kikimr(self, request, kikimr): kikimr.control_plane.fq_config['token_accessor']['use_ssl'] = self.use_ssl kikimr.control_plane.fq_config['token_accessor']['hmac_secret_file'] = self.hmac_secret_file + kikimr.compute_plane.fq_config['token_accessor']['enabled'] = True + kikimr.compute_plane.fq_config['token_accessor']['endpoint'] = self.endpoint + kikimr.compute_plane.fq_config['token_accessor']['use_ssl'] = self.use_ssl + @contextmanager def start_kikimr(request, kikimr_extensions): diff --git a/ydb/tests/tools/fq_runner/mvp_mock.py b/ydb/tests/tools/fq_runner/mvp_mock.py index c962595e8f40..355998027079 100644 --- a/ydb/tests/tools/fq_runner/mvp_mock.py +++ b/ydb/tests/tools/fq_runner/mvp_mock.py @@ -1,12 +1,16 @@ +from functools import partial from http.server import HTTPServer, BaseHTTPRequestHandler import json -import os import socket class MvpMockHttpHandler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" + def __init__(self, ydb_database, *args, **kwargs): + self.ydb_database = ydb_database + super().__init__(*args, **kwargs) + def _set_headers(self, length): self.send_response(200, "OK") self.send_header("Content-type", "application/json; charset=utf-8") @@ -14,7 +18,7 @@ def _set_headers(self, length): self.end_headers() def do_GET(self): - endpoint = "{}/?database={}".format(os.getenv("YDB_ENDPOINT"), os.getenv("YDB_DATABASE")) + endpoint = "{}/?database={}".format(self.ydb_database, "local") self.log_message("send response {}".format(endpoint)) response = json.dumps({"endpoint" : endpoint}).encode("utf-8") self._set_headers(len(response)) @@ -26,9 +30,9 @@ class HTTPServerIPv6(HTTPServer): class MvpMockServer: - def __init__(self, port): + def __init__(self, port, ydb_endpoint): self.port = port - self.server = HTTPServerIPv6(('::', self.port), MvpMockHttpHandler) + self.server = HTTPServerIPv6(('::', self.port), partial(MvpMockHttpHandler, ydb_endpoint)) def handle_request(self): self.server.handle_request() diff --git a/ydb/tests/tools/ya.make b/ydb/tests/tools/ya.make index f391529f96f8..ce62665f2705 100644 --- a/ydb/tests/tools/ya.make +++ b/ydb/tests/tools/ya.make @@ -1,7 +1,6 @@ RECURSE( canondata_sync datastreams_helpers - docker_compose_helpers fq_runner idx_test kqprun