From a573b0c4c02afb445b1d7c2d71b7314524ca71b6 Mon Sep 17 00:00:00 2001 From: Maksim Tsoy Date: Sun, 14 Jan 2024 13:38:55 +0000 Subject: [PATCH] YQ Connector: integration tests --- .../libs/actors/clusters_from_connections.cpp | 6 +- ydb/core/fq/libs/config/protos/common.proto | 1 + .../actors/query_utils.cpp | 12 +- ydb/tests/fq/generic/clickhouse/init.sql | 5 + ydb/tests/fq/generic/conftest.py | 36 ++++++ ydb/tests/fq/generic/docker-compose.yml | 32 ++++++ ydb/tests/fq/generic/postgresql/init.sql | 5 + ydb/tests/fq/generic/test_clickhouse.py | 45 ++++++++ ydb/tests/fq/generic/test_join.py | 58 ++++++++++ ydb/tests/fq/generic/test_postgresql.py | 47 ++++++++ ydb/tests/fq/generic/utils/settings.py | 93 ++++++++++++++++ ydb/tests/fq/generic/utils/ya.make | 9 ++ ydb/tests/fq/generic/ya.make | 51 +++++++++ ydb/tests/fq/ya.make | 1 + .../endpoint_determiner.py | 22 ++++ .../tools/docker_compose_helpers/ya.make | 11 ++ ydb/tests/tools/fq_runner/fq_client.py | 32 ++++++ ydb/tests/tools/fq_runner/kikimr_runner.py | 6 + ydb/tests/tools/fq_runner/kikimr_utils.py | 85 +++++++++++++++ ydb/tests/tools/mdb_mock/__main__.py | 103 ++++++++++++++++++ ydb/tests/tools/mdb_mock/recipe.inc | 2 + ydb/tests/tools/mdb_mock/ya.make | 22 ++++ .../tools/token_accessor_mock/__main__.py | 84 ++++++++++++++ .../tools/token_accessor_mock/recipe.inc | 2 + ydb/tests/tools/token_accessor_mock/ya.make | 23 ++++ ydb/tests/tools/ya.make | 3 + 26 files changed, 786 insertions(+), 10 deletions(-) create mode 100644 ydb/tests/fq/generic/clickhouse/init.sql create mode 100644 ydb/tests/fq/generic/conftest.py create mode 100644 ydb/tests/fq/generic/docker-compose.yml create mode 100644 ydb/tests/fq/generic/postgresql/init.sql create mode 100644 ydb/tests/fq/generic/test_clickhouse.py create mode 100644 ydb/tests/fq/generic/test_join.py create mode 100644 ydb/tests/fq/generic/test_postgresql.py create mode 100644 ydb/tests/fq/generic/utils/settings.py create mode 100644 ydb/tests/fq/generic/utils/ya.make create mode 100644 ydb/tests/fq/generic/ya.make create mode 100644 ydb/tests/tools/docker_compose_helpers/endpoint_determiner.py create mode 100644 ydb/tests/tools/docker_compose_helpers/ya.make create mode 100644 ydb/tests/tools/mdb_mock/__main__.py create mode 100644 ydb/tests/tools/mdb_mock/recipe.inc create mode 100644 ydb/tests/tools/mdb_mock/ya.make create mode 100644 ydb/tests/tools/token_accessor_mock/__main__.py create mode 100644 ydb/tests/tools/token_accessor_mock/recipe.inc create mode 100644 ydb/tests/tools/token_accessor_mock/ya.make diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp index fa505591c924..8405473860f0 100644 --- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp +++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp @@ -120,11 +120,7 @@ void FillGenericClusterConfigBase( clusterCfg.mutable_credentials()->mutable_basic()->set_username(connection.login()); clusterCfg.mutable_credentials()->mutable_basic()->set_password(connection.password()); FillClusterAuth(clusterCfg, connection.auth(), authToken, accountIdSignatures); - - // Since resolver always returns secure ports, we'll always ask for secure connections - // between remote Connector and the data source: - // https://a.yandex-team.ru/arcadia/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp#L24 - clusterCfg.SetUseSsl(true); + clusterCfg.SetUseSsl(!common.GetDisableSslForGenericDataSources()); // In YQv1 we just hardcode desired protocols here. // In YQv2 protocol can be configured via `CREATE EXTERNAL DATA SOURCE` params. diff --git a/ydb/core/fq/libs/config/protos/common.proto b/ydb/core/fq/libs/config/protos/common.proto index 6c0ed75b22d3..e9553894ef7e 100644 --- a/ydb/core/fq/libs/config/protos/common.proto +++ b/ydb/core/fq/libs/config/protos/common.proto @@ -27,4 +27,5 @@ message TCommonConfig { uint64 MaxTasksPerStage = 12; bool KeepInternalErrors = 13; bool UseNativeProtocolForClickHouse = 14; + bool DisableSslForGenericDataSources = 15; } diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index 4af0f3bb234a..420c7743be3e 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -149,7 +149,7 @@ TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting, )", "auth_method"_a = ToString(authMethod), "login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'), - "password_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"')); + "password_secret_name"_a = EncloseAndEscapeString("k2" + name, '"')); case EYdbComputeAuth::MDB_BASIC: return fmt::format( R"(, @@ -163,7 +163,7 @@ TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting, "service_account_id"_a = EncloseAndEscapeString(ExtractServiceAccountId(setting), '"'), "sa_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'), "login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'), - "password_secret_name"_a = EncloseAndEscapeString(signer ? "k2" + name : TString{}, '"')); + "password_secret_name"_a = EncloseAndEscapeString("k2" + name, '"')); } } @@ -185,11 +185,12 @@ TString MakeCreateExternalDataSourceQuery( MDB_CLUSTER_ID={mdb_cluster_id}, DATABASE_NAME={database_name}, PROTOCOL="{protocol}", - USE_TLS="true" + USE_TLS="{use_tls}" )", "mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().clickhouse_cluster().database_id(), '"'), "database_name"_a = EncloseAndEscapeString(connectionContent.setting().clickhouse_cluster().database_name(), '"'), - "protocol"_a = common.GetUseNativeProtocolForClickHouse() ? "NATIVE" : "HTTP"); + "protocol"_a = common.GetUseNativeProtocolForClickHouse() ? "NATIVE" : "HTTP", + "use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true"); break; case FederatedQuery::ConnectionSetting::kDataStreams: break; @@ -213,11 +214,12 @@ TString MakeCreateExternalDataSourceQuery( MDB_CLUSTER_ID={mdb_cluster_id}, DATABASE_NAME={database_name}, PROTOCOL="NATIVE", - USE_TLS="true" + USE_TLS="{use_tls}" {schema} )", "mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"'), "database_name"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_name(), '"'), + "use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true", "schema"_a = schema ? ", SCHEMA=" + EncloseAndEscapeString(schema, '"') : TString{}); break; } diff --git a/ydb/tests/fq/generic/clickhouse/init.sql b/ydb/tests/fq/generic/clickhouse/init.sql new file mode 100644 index 000000000000..2f053a8b05ff --- /dev/null +++ b/ydb/tests/fq/generic/clickhouse/init.sql @@ -0,0 +1,5 @@ +CREATE TABLE db.simple_table (number INT) ENGINE = Log(); +INSERT INTO db.simple_table VALUES ((1)), ((2)), ((3)); + +CREATE TABLE db.join_table (id INT, data INT) ENGINE = Log(); +INSERT INTO db.join_table VALUES (1, 10), (2, 20), (3, 30); diff --git a/ydb/tests/fq/generic/conftest.py b/ydb/tests/fq/generic/conftest.py new file mode 100644 index 000000000000..e21444a3efc2 --- /dev/null +++ b/ydb/tests/fq/generic/conftest.py @@ -0,0 +1,36 @@ +import pytest + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support +from ydb.tests.tools.fq_runner.kikimr_utils import ConnectorExtension +from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension +from ydb.tests.tools.fq_runner.kikimr_utils import TokenAccessorExtension +from ydb.tests.tools.fq_runner.kikimr_utils import MDBExtension +from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr + +from utils.settings import Settings + + +@pytest.fixture +def settings() -> Settings: + return Settings.from_env() + + +@pytest.fixture +def kikimr(request: pytest.FixtureRequest, settings: Settings, yq_version: str): + kikimr_extensions = [ + ConnectorExtension(settings.connector.grpc_host, settings.connector.grpc_port, False), + TokenAccessorExtension(settings.token_accessor_mock.endpoint, settings.token_accessor_mock.hmac_secret_file), + MDBExtension(settings.mdb_mock.endpoint), + YQv2Extension(yq_version), + ] + with start_kikimr(request, kikimr_extensions) as kikimr: + yield kikimr + + +@pytest.fixture +def fq_client(kikimr, request=None) -> FederatedQueryClient: + client = FederatedQueryClient( + request.param["folder_id"] if request is not None else "my_folder", streaming_over_kikimr=kikimr + ) + return client diff --git a/ydb/tests/fq/generic/docker-compose.yml b/ydb/tests/fq/generic/docker-compose.yml new file mode 100644 index 000000000000..023a5601996f --- /dev/null +++ b/ydb/tests/fq/generic/docker-compose.yml @@ -0,0 +1,32 @@ +version: '3.4' +services: + postgresql: + image: "postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085" + container_name: ydb_tests_fq_generic_postgresql + environment: + POSTGRES_DB: db + POSTGRES_USER: user + POSTGRES_PASSWORD: password + volumes: + - ./postgresql:/docker-entrypoint-initdb.d + ports: + - '6432' + command: -p 6432 + clickhouse: + image: "clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06" + container_name: ydb_tests_fq_generic_clickhouse + environment: + CLICKHOUSE_DB: db + CLICKHOUSE_USER: user + CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 + CLICKHOUSE_PASSWORD: password + volumes: + - ./clickhouse:/docker-entrypoint-initdb.d + ports: + - '8123' + - '9000' + connector: + image: "ghcr.io/ydb-platform/fq-connector-go:v0.1.1-rc.2@sha256:e5c2d86bce9cb43420eed0ed534afe760fb90ad41229dbbf34af28023b219af3" + container_name: ydb_tests_fq_generic_fq-connector-go + ports: + - '50051' diff --git a/ydb/tests/fq/generic/postgresql/init.sql b/ydb/tests/fq/generic/postgresql/init.sql new file mode 100644 index 000000000000..f795d019cacf --- /dev/null +++ b/ydb/tests/fq/generic/postgresql/init.sql @@ -0,0 +1,5 @@ +CREATE TABLE simple_table (number INT); +INSERT INTO simple_table VALUES ((1)), ((2)), ((3)); + +CREATE TABLE join_table (id INT, data INT); +INSERT INTO join_table VALUES (1, 10), (2, 20), (3, 30); diff --git a/ydb/tests/fq/generic/test_clickhouse.py b/ydb/tests/fq/generic/test_clickhouse.py new file mode 100644 index 000000000000..f3362894a056 --- /dev/null +++ b/ydb/tests/fq/generic/test_clickhouse.py @@ -0,0 +1,45 @@ +import logging +import pytest + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from utils.settings import Settings + + +class TestClickHouse: + @yq_v2 + @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) + def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): + table_name = 'simple_table' + conn_name = f'conn_{table_name}' + query_name = f'query_{table_name}' + + fq_client.create_clickhouse_connection( + name=conn_name, + database_name=settings.clickhouse.dbname, + database_id='clickhouse_cluster_id', + login=settings.clickhouse.username, + password=settings.clickhouse.password, + ) + + sql = fR''' + SELECT * + FROM {conn_name}.{table_name}; + ''' + + query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = fq_client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "number" + assert result_set.columns[0].type.type_id == ydb.Type.INT32 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int32_value == 1 + assert result_set.rows[1].items[0].int32_value == 2 + assert result_set.rows[2].items[0].int32_value == 3 diff --git a/ydb/tests/fq/generic/test_join.py b/ydb/tests/fq/generic/test_join.py new file mode 100644 index 000000000000..63070744bb7e --- /dev/null +++ b/ydb/tests/fq/generic/test_join.py @@ -0,0 +1,58 @@ +import logging +import pytest + +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from utils.settings import Settings + + +class TestJoin: + @yq_v2 + @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) + def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): + table_name = 'join_table' + ch_conn_name = f'ch_conn_{table_name}' + pg_conn_name = f'pg_conn_{table_name}' + query_name = f'query_{table_name}' + + fq_client.create_postgresql_connection( + name=pg_conn_name, + database_name=settings.postgresql.dbname, + database_id='postgresql_cluster_id', + login=settings.postgresql.username, + password=settings.postgresql.password, + ) + + fq_client.create_clickhouse_connection( + name=ch_conn_name, + database_name=settings.clickhouse.dbname, + database_id='clickhouse_cluster_id', + login=settings.clickhouse.username, + password=settings.clickhouse.password, + ) + + sql = fR''' + SELECT pg.data AS data_pg, ch.data AS data_ch + FROM {pg_conn_name}.{table_name} AS pg + JOIN {ch_conn_name}.{table_name} AS ch + ON pg.id = ch.id; + ''' + + query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = fq_client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 2 + assert result_set.columns[0].name == "data_pg" + assert result_set.columns[1].name == "data_ch" + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int32_value == 10 + assert result_set.rows[0].items[1].int32_value == 10 + assert result_set.rows[1].items[0].int32_value == 20 + assert result_set.rows[1].items[1].int32_value == 20 + assert result_set.rows[2].items[0].int32_value == 30 + assert result_set.rows[2].items[1].int32_value == 30 diff --git a/ydb/tests/fq/generic/test_postgresql.py b/ydb/tests/fq/generic/test_postgresql.py new file mode 100644 index 000000000000..59bcfe5b96fd --- /dev/null +++ b/ydb/tests/fq/generic/test_postgresql.py @@ -0,0 +1,47 @@ +import logging +import pytest + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2 + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from utils.settings import Settings + + +class TestPostgreSQL: + @yq_v2 + @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) + def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): + table_name = 'simple_table' + conn_name = f'conn_{table_name}' + query_name = f'query_{table_name}' + + fq_client.create_postgresql_connection( + name=conn_name, + database_name=settings.postgresql.dbname, + database_id='postgresql_cluster_id', + login=settings.postgresql.username, + password=settings.postgresql.password, + ) + + sql = fR''' + SELECT * + FROM {conn_name}.{table_name}; + ''' + + query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = fq_client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "number" + assert result_set.columns[0].type == ydb.Type( + optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.INT32)) + ) + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int32_value == 1 + assert result_set.rows[1].items[0].int32_value == 2 + assert result_set.rows[2].items[0].int32_value == 3 diff --git a/ydb/tests/fq/generic/utils/settings.py b/ydb/tests/fq/generic/utils/settings.py new file mode 100644 index 000000000000..4c30c89a8a49 --- /dev/null +++ b/ydb/tests/fq/generic/utils/settings.py @@ -0,0 +1,93 @@ +from os import environ +from dataclasses import dataclass +from typing import Optional + +import yatest.common + +from ydb.tests.tools.docker_compose_helpers.endpoint_determiner import EndpointDeterminer + + +@dataclass +class Settings: + @dataclass + class Connector: + grpc_host: str + grpc_port: int + + connector: Connector + + @dataclass + class MdbMock: + endpoint: str + + mdb_mock: MdbMock + + @dataclass + class TokenAccessorMock: + endpoint: str + hmac_secret_file: str + + token_accessor_mock: TokenAccessorMock + + @dataclass + class ClickHouse: + cluster_name: str + dbname: str + username: str + password: str + host: str + http_port: int + native_port: int + protocol: str + + clickhouse: ClickHouse + + @dataclass + class PostgreSQL: + cluster_name: str + dbname: str + username: str + password: Optional[str] + host: str + port: int + + postgresql: PostgreSQL + + @classmethod + def from_env(cls) -> 'Settings': + docker_compose_file = yatest.common.source_path('ydb/tests/fq/generic/docker-compose.yml') + endpoint_determiner = EndpointDeterminer(docker_compose_file) + + s = cls( + connector=cls.Connector( + grpc_host='localhost', + grpc_port=endpoint_determiner.get_port('connector', 50051), + ), + mdb_mock=cls.MdbMock( + endpoint=environ['MDB_MOCK_ENDPOINT'], + ), + token_accessor_mock=cls.TokenAccessorMock( + endpoint=environ['TOKEN_ACCESSOR_MOCK_ENDPOINT'], + hmac_secret_file=environ['TOKEN_ACCESSOR_HMAC_SECRET_FILE'], + ), + clickhouse=cls.ClickHouse( + cluster_name='clickhouse_integration_test', + dbname='db', + host='localhost', + http_port=endpoint_determiner.get_port('clickhouse', 8123), + native_port=endpoint_determiner.get_port('clickhouse', 9000), + username='user', + password='password', + protocol='native', + ), + postgresql=cls.PostgreSQL( + cluster_name='postgresql_integration_test', + dbname='db', + host='localhost', + port=endpoint_determiner.get_port('postgresql', 6432), + username='user', + password='password', + ), + ) + + return s diff --git a/ydb/tests/fq/generic/utils/ya.make b/ydb/tests/fq/generic/utils/ya.make new file mode 100644 index 000000000000..62d01a35a38a --- /dev/null +++ b/ydb/tests/fq/generic/utils/ya.make @@ -0,0 +1,9 @@ +PY3_LIBRARY() + +STYLE_PYTHON() + +PY_SRCS( + settings.py +) + +END() diff --git a/ydb/tests/fq/generic/ya.make b/ydb/tests/fq/generic/ya.make new file mode 100644 index 000000000000..d28e62b44907 --- /dev/null +++ b/ydb/tests/fq/generic/ya.make @@ -0,0 +1,51 @@ +OWNER(g:yq) + +PY3TEST() + +STYLE_PYTHON() +NO_CHECK_IMPORTS() + +SIZE(LARGE) + +TAG( + ya:external + ya:force_sandbox + ya:fat +) + +REQUIREMENTS( + container:4467981730 + cpu:all + dns:dns64 +) + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/mdb_mock/recipe.inc) +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/token_accessor_mock/recipe.inc) +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) +INCLUDE(${ARCADIA_ROOT}/library/recipes/docker_compose/recipe.inc) + +PEERDIR( + ydb/tests/fq/generic/utils + + library/python/testing/recipe + library/python/testing/yatest_common + library/recipes/common + ydb/tests/tools/fq_runner + ydb/tests/tools/docker_compose_helpers + ydb/public/api/protos + + contrib/python/pytest +) + +DEPENDS( + contrib/python/moto/bin +) + +TEST_SRCS( + conftest.py + test_clickhouse.py + test_join.py + test_postgresql.py +) + +END() diff --git a/ydb/tests/fq/ya.make b/ydb/tests/fq/ya.make index d06db5432121..734e38f7f709 100644 --- a/ydb/tests/fq/ya.make +++ b/ydb/tests/fq/ya.make @@ -1,5 +1,6 @@ RECURSE_FOR_TESTS( common + generic http_api mem_alloc multi_plane diff --git a/ydb/tests/tools/docker_compose_helpers/endpoint_determiner.py b/ydb/tests/tools/docker_compose_helpers/endpoint_determiner.py new file mode 100644 index 000000000000..7d5b2b13d855 --- /dev/null +++ b/ydb/tests/tools/docker_compose_helpers/endpoint_determiner.py @@ -0,0 +1,22 @@ +import os +import subprocess + +import yatest.common + + +class EndpointDeterminer: + docker_compose_bin: os.PathLike + docker_compose_yml: os.PathLike + + def __init__(self, docker_compose_yml: os.PathLike): + self.docker_compose_bin = yatest.common.build_path('library/recipes/docker_compose/bin/docker-compose') + self.docker_compose_yml = docker_compose_yml + + def get_port(self, service_name: str, internal_port: int) -> int: + cmd = [self.docker_compose_bin, '-f', self.docker_compose_yml, 'port', service_name, str(internal_port)] + try: + out = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + external_port = int(out.split(b':')[1]) + return external_port + except subprocess.CalledProcessError as e: + raise RuntimeError(f"docker-compose error: {e.output} (code {e.returncode})") diff --git a/ydb/tests/tools/docker_compose_helpers/ya.make b/ydb/tests/tools/docker_compose_helpers/ya.make new file mode 100644 index 000000000000..36866791727b --- /dev/null +++ b/ydb/tests/tools/docker_compose_helpers/ya.make @@ -0,0 +1,11 @@ +PY23_LIBRARY() + +PY_SRCS( + endpoint_determiner.py +) + +PEERDIR( + library/python/testing/yatest_common +) + +END() diff --git a/ydb/tests/tools/fq_runner/fq_client.py b/ydb/tests/tools/fq_runner/fq_client.py index 1069be5bcad8..0a435033a11b 100644 --- a/ydb/tests/tools/fq_runner/fq_client.py +++ b/ydb/tests/tools/fq_runner/fq_client.py @@ -411,6 +411,38 @@ def create_yds_connection(self, name, database=None, endpoint=None, database_id= request.content.acl.visibility = visibility return self.create_connection(request, check_issues) + @retry.retry_intrusive + def create_postgresql_connection(self, name, database_name, database_id, login, password, + secure=False, visibility=fq.Acl.Visibility.PRIVATE, auth_method=AuthMethod.service_account('sa'), check_issues=True): + request = fq.CreateConnectionRequest() + request.content.name = name + pg = request.content.setting.postgresql_cluster + pg.database_name = database_name + pg.database_id = database_id + pg.secure = secure + pg.login = login + pg.password = password + + pg.auth.CopyFrom(auth_method) + request.content.acl.visibility = visibility + return self.create_connection(request, check_issues) + + @retry.retry_intrusive + def create_clickhouse_connection(self, name, database_name, database_id, login, password, + secure=False, visibility=fq.Acl.Visibility.PRIVATE, auth_method=AuthMethod.service_account('sa'), check_issues=True): + request = fq.CreateConnectionRequest() + request.content.name = name + ch = request.content.setting.clickhouse_cluster + ch.database_name = database_name + ch.database_id = database_id + ch.secure = secure + ch.login = login + ch.password = password + + ch.auth.CopyFrom(auth_method) + request.content.acl.visibility = visibility + return self.create_connection(request, check_issues) + @retry.retry_intrusive def list_connections(self, visibility, name_substring=None, limit=100, check_issues=True, page_token=""): request = fq.ListConnectionsRequest() diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 85addb6621db..fb297ada4c18 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -74,6 +74,12 @@ def qs_config(self): if 'query_service_config' not in self.config_generator.yaml_config: self.config_generator.yaml_config['query_service_config'] = {} return self.config_generator.yaml_config['query_service_config'] + + @property + def auth_config(self): + if 'auth_config' not in self.config_generator.yaml_config: + self.config_generator.yaml_config['auth_config'] = {} + return self.config_generator.yaml_config['auth_config'] def enable_logging(self, component, level=LogLevels.TRACE): log_config = self.config_generator.yaml_config['log_config'] diff --git a/ydb/tests/tools/fq_runner/kikimr_utils.py b/ydb/tests/tools/fq_runner/kikimr_utils.py index 2ffcbd392ff2..2b1d316b6e38 100644 --- a/ydb/tests/tools/fq_runner/kikimr_utils.py +++ b/ydb/tests/tools/fq_runner/kikimr_utils.py @@ -243,6 +243,91 @@ def apply_to_kikimr(self, request, kikimr): kikimr.compute_plane.config_generator.yaml_config["table_service_config"]["bindings_mode"] = self.bindings_mode +class ConnectorExtension(ExtensionPoint): + + def __init__(self, host, port, use_ssl): + ConnectorExtension.__init__.__annotations__ = { + 'host' : str, + 'port': int, + 'use_ssl': bool, + 'return': None + } + super().__init__() + self.host = host + self.port = port + self.use_ssl = use_ssl + + def is_applicable(self, request): + return True + + def apply_to_kikimr(self, request, kikimr): + kikimr.control_plane.fq_config['common']['disable_ssl_for_generic_data_sources'] = True + kikimr.control_plane.fq_config['control_plane_storage']['available_connection'].append('POSTGRESQL_CLUSTER') + kikimr.control_plane.fq_config['control_plane_storage']['available_connection'].append('CLICKHOUSE_CLUSTER') + + generic = { + 'connector': { + 'endpoint': { + 'host': self.host, + 'port': self.port, + }, + 'use_ssl': self.use_ssl, + }, + } + + kikimr.compute_plane.fq_config['gateways']['generic'] = generic # v1 + kikimr.compute_plane.qs_config['generic'] = generic # v2 + + +class MDBExtension(ExtensionPoint): + + def __init__(self, endpoint: str, use_ssl=False): + MDBExtension.__init__.__annotations__ = { + 'endpoint': str, + 'use_ssl': bool + } + super().__init__() + self.endpoint = endpoint + self.use_ssl = use_ssl + + def is_applicable(self, request): + return True + + def apply_to_kikimr(self, request, kikimr): + kikimr.compute_plane.qs_config['mdb_transform_host'] = False + + kikimr.compute_plane.fq_config['common']['mdb_gateway'] = self.endpoint # v2 + kikimr.compute_plane.fq_config['gateways']['generic']['mdb_gateway'] = self.endpoint # v1 + + +class TokenAccessorExtension(ExtensionPoint): + + def __init__(self, endpoint: str, hmac_secret_file: str, use_ssl=False): + TokenAccessorExtension.__init__.__annotations__ = { + 'endpoint': str, + 'hmac_secret_file': str, + 'use_ssl': bool, + } + super().__init__() + self.endpoint = endpoint + self.hmac_secret_file = hmac_secret_file + self.use_ssl = use_ssl + + def is_applicable(self, request): + return True + + def apply_to_kikimr(self, request, kikimr): + kikimr.compute_plane.auth_config['token_accessor_config'] = { + 'enabled': True, + 'endpoint': self.endpoint, + } + + kikimr.control_plane.fq_config['token_accessor']['enabled'] = True + kikimr.control_plane.fq_config['token_accessor']['endpoint'] = self.endpoint + kikimr.control_plane.fq_config['token_accessor']['use_ssl'] = self.use_ssl + kikimr.control_plane.fq_config['token_accessor']['hmac_secret_file'] = self.hmac_secret_file + + @contextmanager def start_kikimr(request, kikimr_extensions): start_kikimr.__annotations__ = { diff --git a/ydb/tests/tools/mdb_mock/__main__.py b/ydb/tests/tools/mdb_mock/__main__.py new file mode 100644 index 000000000000..1fbb4ce920a2 --- /dev/null +++ b/ydb/tests/tools/mdb_mock/__main__.py @@ -0,0 +1,103 @@ +import os +import logging +from typing import Final + +import json +from aiohttp import web + +import yatest.common as yat +from library.python.testing.recipe import declare_recipe, set_env +from library.recipes.common import find_free_ports, start_daemon + +logger = logging.getLogger('mdb_mock.recipe') + +MDB_MOCK_PID_FILE: Final = 'recipe.mdb_mock.pid' + + +async def clickhouse_handler(request): + cluster_id = request.match_info['cluster_id'] + + if cluster_id == 'clickhouse_cluster_id': + return web.Response(body=json.dumps( + { + 'hosts': [ + { + 'name': 'ydb_tests_fq_generic_clickhouse', + 'cluster_id': cluster_id, + 'health': 'ALIVE', + 'type': 'CLICKHOUSE' + }, + ] + })) + + return web.Response(body=json.dumps({})) + + +async def postgresql_handler(request): + cluster_id = request.match_info['cluster_id'] + + if cluster_id == 'postgresql_cluster_id': + return web.Response(body=json.dumps( + { + 'hosts': [ + { + 'name': 'ydb_tests_fq_generic_postgresql', + 'services': [ + { + 'health': 'ALIVE', + }, + ], + } + ] + })) + return web.Response(body=json.dumps({})) + + +def serve(port: int): + app = web.Application() + app.add_routes([web.get('/managed-clickhouse/v1/clusters/{cluster_id}/hosts', clickhouse_handler)]) + app.add_routes([web.get('/managed-postgresql/v1/clusters/{cluster_id}/hosts', postgresql_handler)]) + web.run_app(app, port=port) + + +def start(argv): + logger.debug('Start arguments: %s', argv) + + host = "0.0.0.0" + port = find_free_ports(1)[0] + _update_environment(host=host, port=port) + + pid = os.fork() + if pid == 0: + logger.info('Starting mdb_mock server...') + serve(port=port) + else: + with open(MDB_MOCK_PID_FILE, "w") as f: + f.write(str(pid)) + + +def _update_environment(host: str, port: int): + variables = { + 'MDB_MOCK_ENDPOINT': f'http://{host}:{port}', + } + + for k, v in variables.items(): + set_env(k, v) + + +def stop(argv): + logger.debug('Start arguments: %s', argv) + logger.info('Terminating mdb_mock server...') + try: + with open(yat.work_path(MDB_MOCK_PID_FILE)) as fin: + pid = fin.read() + except IOError: + logger.error('Can not find server PID') + else: + logger.info('Terminate mdb_mock server PID: %s', pid) + os.kill(int(pid), 15) + logger.info('Server terminated.') + + +if __name__ == "__main__": + declare_recipe(start, stop) diff --git a/ydb/tests/tools/mdb_mock/recipe.inc b/ydb/tests/tools/mdb_mock/recipe.inc new file mode 100644 index 000000000000..f60cd2890782 --- /dev/null +++ b/ydb/tests/tools/mdb_mock/recipe.inc @@ -0,0 +1,2 @@ +DEPENDS(ydb/tests/tools/mdb_mock) +USE_RECIPE(ydb/tests/tools/mdb_mock/recipe) diff --git a/ydb/tests/tools/mdb_mock/ya.make b/ydb/tests/tools/mdb_mock/ya.make new file mode 100644 index 000000000000..6a63bf3567af --- /dev/null +++ b/ydb/tests/tools/mdb_mock/ya.make @@ -0,0 +1,22 @@ +OWNER( + tsmax2004 + g:yq +) + +PY3_PROGRAM(recipe) + +STYLE_PYTHON() + +PY_SRCS( + __main__.py +) + +PEERDIR( + library/python/testing/recipe + library/python/testing/yatest_common + library/recipes/common + + contrib/python/aiohttp +) + +END() diff --git a/ydb/tests/tools/token_accessor_mock/__main__.py b/ydb/tests/tools/token_accessor_mock/__main__.py new file mode 100644 index 000000000000..dabac2518c72 --- /dev/null +++ b/ydb/tests/tools/token_accessor_mock/__main__.py @@ -0,0 +1,84 @@ +import os +import logging +from typing import Final + +from concurrent import futures + +import yatest.common as yat +from library.python.testing.recipe import declare_recipe, set_env +from library.recipes.common import find_free_ports + +import grpc +from ydb.library.yql.providers.common.token_accessor.grpc.token_accessor_pb_pb2_grpc import \ + TokenAccessorServiceServicer, add_TokenAccessorServiceServicer_to_server +from ydb.library.yql.providers.common.token_accessor.grpc.token_accessor_pb_pb2 import GetTokenRequest, GetTokenResponse + +logger = logging.getLogger('token_accessor_mock.recipe') + +TOKEN_ACCESSOR_PID_FILE: Final = 'TOKEN_ACCESSOR_PID_FILE' +TOKEN_ACCESSOR_HMAC_SECRET_FILE: Final = 'TOKEN_ACCESSOR_HMAC_SECRET_FILE' + + +class TokenAccessor(TokenAccessorServiceServicer): + def GetToken(self, request: GetTokenRequest, context) -> GetTokenResponse: + logger.debug('GetToken request: %s', request) + return GetTokenResponse(token='token'.encode()) + + +def serve(port: int) -> None: + server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) + add_TokenAccessorServiceServicer_to_server( + TokenAccessor(), server + ) + server.add_insecure_port(f'[::]:{port}') + server.start() + logger.info(f'token_accessor_mock server started at {port}') + + server.wait_for_termination() + logger.info('token_accessor_mock server stopped') + + +def start(argv): + logger.debug('Start arguments: %s', argv) + + with open(TOKEN_ACCESSOR_HMAC_SECRET_FILE, "w") as f: + f.write('hmac_secret') + + port = find_free_ports(1)[0] + _update_environment(port=port) + + pid = os.fork() + if pid == 0: + logger.info('Starting token_accessor_mock server...') + serve(port=port) + else: + with open(TOKEN_ACCESSOR_PID_FILE, "w") as f: + f.write(str(pid)) + + +def _update_environment(port: int): + variables = { + 'TOKEN_ACCESSOR_MOCK_ENDPOINT': f'localhost:{port}', + 'TOKEN_ACCESSOR_HMAC_SECRET_FILE': os.path.abspath(TOKEN_ACCESSOR_HMAC_SECRET_FILE) + } + + for k, v in variables.items(): + set_env(k, v) + + +def stop(argv): + logger.debug('Stop arguments: %s', argv) + logger.info('Terminating token_accessor_mock server...') + try: + with open(yat.work_path(TOKEN_ACCESSOR_PID_FILE)) as fin: + pid = fin.read() + except IOError: + logger.error('Can not find server PID') + else: + logger.info('Terminate token_accessor_mock server PID: %s', pid) + os.kill(int(pid), 15) + logger.info('Server terminated.') + + +if __name__ == "__main__": + declare_recipe(start, stop) diff --git a/ydb/tests/tools/token_accessor_mock/recipe.inc b/ydb/tests/tools/token_accessor_mock/recipe.inc new file mode 100644 index 000000000000..6d7d51f7fdde --- /dev/null +++ b/ydb/tests/tools/token_accessor_mock/recipe.inc @@ -0,0 +1,2 @@ +DEPENDS(ydb/tests/tools/token_accessor_mock) +USE_RECIPE(ydb/tests/tools/token_accessor_mock/recipe) diff --git a/ydb/tests/tools/token_accessor_mock/ya.make b/ydb/tests/tools/token_accessor_mock/ya.make new file mode 100644 index 000000000000..60b91b8291f7 --- /dev/null +++ b/ydb/tests/tools/token_accessor_mock/ya.make @@ -0,0 +1,23 @@ +OWNER( + tsmax2004 + g:yq +) + +PY3_PROGRAM(recipe) + +STYLE_PYTHON() + +PY_SRCS( + __main__.py +) + +PEERDIR( + library/python/testing/recipe + library/python/testing/yatest_common + library/recipes/common + + contrib/python/grpcio + ydb/library/yql/providers/common/token_accessor/grpc +) + +END() diff --git a/ydb/tests/tools/ya.make b/ydb/tests/tools/ya.make index c9a05875a7a3..f391529f96f8 100644 --- a/ydb/tests/tools/ya.make +++ b/ydb/tests/tools/ya.make @@ -1,11 +1,14 @@ RECURSE( canondata_sync datastreams_helpers + docker_compose_helpers fq_runner idx_test kqprun + mdb_mock pq_read s3_recipe + token_accessor_mock ydb_serializable ydb_serializable/replay )