Skip to content

Commit

Permalink
YQ Connector: integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tsmax2004 committed Jan 14, 2024
1 parent c5256a8 commit a573b0c
Show file tree
Hide file tree
Showing 26 changed files with 786 additions and 10 deletions.
6 changes: 1 addition & 5 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ void FillGenericClusterConfigBase(
clusterCfg.mutable_credentials()->mutable_basic()->set_username(connection.login());
clusterCfg.mutable_credentials()->mutable_basic()->set_password(connection.password());
FillClusterAuth(clusterCfg, connection.auth(), authToken, accountIdSignatures);

// Since resolver always returns secure ports, we'll always ask for secure connections
// between remote Connector and the data source:
// https://a.yandex-team.ru/arcadia/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp#L24
clusterCfg.SetUseSsl(true);
clusterCfg.SetUseSsl(!common.GetDisableSslForGenericDataSources());

// In YQv1 we just hardcode desired protocols here.
// In YQv2 protocol can be configured via `CREATE EXTERNAL DATA SOURCE` params.
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/config/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ message TCommonConfig {
uint64 MaxTasksPerStage = 12;
bool KeepInternalErrors = 13;
bool UseNativeProtocolForClickHouse = 14;
bool DisableSslForGenericDataSources = 15;
}
12 changes: 7 additions & 5 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting,
)",
"auth_method"_a = ToString(authMethod),
"login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'),
"password_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'));
"password_secret_name"_a = EncloseAndEscapeString("k2" + name, '"'));
case EYdbComputeAuth::MDB_BASIC:
return fmt::format(
R"(,
Expand All @@ -163,7 +163,7 @@ TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting,
"service_account_id"_a = EncloseAndEscapeString(ExtractServiceAccountId(setting), '"'),
"sa_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'),
"login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'),
"password_secret_name"_a = EncloseAndEscapeString(signer ? "k2" + name : TString{}, '"'));
"password_secret_name"_a = EncloseAndEscapeString("k2" + name, '"'));
}
}

Expand All @@ -185,11 +185,12 @@ TString MakeCreateExternalDataSourceQuery(
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
PROTOCOL="{protocol}",
USE_TLS="true"
USE_TLS="{use_tls}"
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().clickhouse_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().clickhouse_cluster().database_name(), '"'),
"protocol"_a = common.GetUseNativeProtocolForClickHouse() ? "NATIVE" : "HTTP");
"protocol"_a = common.GetUseNativeProtocolForClickHouse() ? "NATIVE" : "HTTP",
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");
break;
case FederatedQuery::ConnectionSetting::kDataStreams:
break;
Expand All @@ -213,11 +214,12 @@ TString MakeCreateExternalDataSourceQuery(
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
PROTOCOL="NATIVE",
USE_TLS="true"
USE_TLS="{use_tls}"
{schema}
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = schema ? ", SCHEMA=" + EncloseAndEscapeString(schema, '"') : TString{});
break;
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/tests/fq/generic/clickhouse/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE db.simple_table (number INT) ENGINE = Log();
INSERT INTO db.simple_table VALUES ((1)), ((2)), ((3));

CREATE TABLE db.join_table (id INT, data INT) ENGINE = Log();
INSERT INTO db.join_table VALUES (1, 10), (2, 20), (3, 30);
36 changes: 36 additions & 0 deletions ydb/tests/fq/generic/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
from ydb.tests.tools.fq_runner.kikimr_utils import ConnectorExtension
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
from ydb.tests.tools.fq_runner.kikimr_utils import TokenAccessorExtension
from ydb.tests.tools.fq_runner.kikimr_utils import MDBExtension
from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr

from utils.settings import Settings


@pytest.fixture
def settings() -> Settings:
return Settings.from_env()


@pytest.fixture
def kikimr(request: pytest.FixtureRequest, settings: Settings, yq_version: str):
kikimr_extensions = [
ConnectorExtension(settings.connector.grpc_host, settings.connector.grpc_port, False),
TokenAccessorExtension(settings.token_accessor_mock.endpoint, settings.token_accessor_mock.hmac_secret_file),
MDBExtension(settings.mdb_mock.endpoint),
YQv2Extension(yq_version),
]
with start_kikimr(request, kikimr_extensions) as kikimr:
yield kikimr


@pytest.fixture
def fq_client(kikimr, request=None) -> FederatedQueryClient:
client = FederatedQueryClient(
request.param["folder_id"] if request is not None else "my_folder", streaming_over_kikimr=kikimr
)
return client
32 changes: 32 additions & 0 deletions ydb/tests/fq/generic/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: '3.4'
services:
postgresql:
image: "postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085"
container_name: ydb_tests_fq_generic_postgresql
environment:
POSTGRES_DB: db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
volumes:
- ./postgresql:/docker-entrypoint-initdb.d
ports:
- '6432'
command: -p 6432
clickhouse:
image: "clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06"
container_name: ydb_tests_fq_generic_clickhouse
environment:
CLICKHOUSE_DB: db
CLICKHOUSE_USER: user
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: password
volumes:
- ./clickhouse:/docker-entrypoint-initdb.d
ports:
- '8123'
- '9000'
connector:
image: "ghcr.io/ydb-platform/fq-connector-go:v0.1.1-rc.2@sha256:e5c2d86bce9cb43420eed0ed534afe760fb90ad41229dbbf34af28023b219af3"
container_name: ydb_tests_fq_generic_fq-connector-go
ports:
- '50051'
5 changes: 5 additions & 0 deletions ydb/tests/fq/generic/postgresql/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE simple_table (number INT);
INSERT INTO simple_table VALUES ((1)), ((2)), ((3));

CREATE TABLE join_table (id INT, data INT);
INSERT INTO join_table VALUES (1, 10), (2, 20), (3, 30);
45 changes: 45 additions & 0 deletions ydb/tests/fq/generic/test_clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging
import pytest

import ydb.public.api.protos.draft.fq_pb2 as fq
import ydb.public.api.protos.ydb_value_pb2 as ydb
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from utils.settings import Settings


class TestClickHouse:
@yq_v2
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)
def test_simple(self, fq_client: FederatedQueryClient, settings: Settings):
table_name = 'simple_table'
conn_name = f'conn_{table_name}'
query_name = f'query_{table_name}'

fq_client.create_clickhouse_connection(
name=conn_name,
database_name=settings.clickhouse.dbname,
database_id='clickhouse_cluster_id',
login=settings.clickhouse.username,
password=settings.clickhouse.password,
)

sql = fR'''
SELECT *
FROM {conn_name}.{table_name};
'''

query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = fq_client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 1
assert result_set.columns[0].name == "number"
assert result_set.columns[0].type.type_id == ydb.Type.INT32
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].int32_value == 1
assert result_set.rows[1].items[0].int32_value == 2
assert result_set.rows[2].items[0].int32_value == 3
58 changes: 58 additions & 0 deletions ydb/tests/fq/generic/test_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import pytest

import ydb.public.api.protos.draft.fq_pb2 as fq
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from utils.settings import Settings


class TestJoin:
@yq_v2
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)
def test_simple(self, fq_client: FederatedQueryClient, settings: Settings):
table_name = 'join_table'
ch_conn_name = f'ch_conn_{table_name}'
pg_conn_name = f'pg_conn_{table_name}'
query_name = f'query_{table_name}'

fq_client.create_postgresql_connection(
name=pg_conn_name,
database_name=settings.postgresql.dbname,
database_id='postgresql_cluster_id',
login=settings.postgresql.username,
password=settings.postgresql.password,
)

fq_client.create_clickhouse_connection(
name=ch_conn_name,
database_name=settings.clickhouse.dbname,
database_id='clickhouse_cluster_id',
login=settings.clickhouse.username,
password=settings.clickhouse.password,
)

sql = fR'''
SELECT pg.data AS data_pg, ch.data AS data_ch
FROM {pg_conn_name}.{table_name} AS pg
JOIN {ch_conn_name}.{table_name} AS ch
ON pg.id = ch.id;
'''

query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = fq_client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 2
assert result_set.columns[0].name == "data_pg"
assert result_set.columns[1].name == "data_ch"
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].int32_value == 10
assert result_set.rows[0].items[1].int32_value == 10
assert result_set.rows[1].items[0].int32_value == 20
assert result_set.rows[1].items[1].int32_value == 20
assert result_set.rows[2].items[0].int32_value == 30
assert result_set.rows[2].items[1].int32_value == 30
47 changes: 47 additions & 0 deletions ydb/tests/fq/generic/test_postgresql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
import pytest

import ydb.public.api.protos.draft.fq_pb2 as fq
import ydb.public.api.protos.ydb_value_pb2 as ydb
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v2

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from utils.settings import Settings


class TestPostgreSQL:
@yq_v2
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)
def test_simple(self, fq_client: FederatedQueryClient, settings: Settings):
table_name = 'simple_table'
conn_name = f'conn_{table_name}'
query_name = f'query_{table_name}'

fq_client.create_postgresql_connection(
name=conn_name,
database_name=settings.postgresql.dbname,
database_id='postgresql_cluster_id',
login=settings.postgresql.username,
password=settings.postgresql.password,
)

sql = fR'''
SELECT *
FROM {conn_name}.{table_name};
'''

query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = fq_client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 1
assert result_set.columns[0].name == "number"
assert result_set.columns[0].type == ydb.Type(
optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.INT32))
)
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].int32_value == 1
assert result_set.rows[1].items[0].int32_value == 2
assert result_set.rows[2].items[0].int32_value == 3
93 changes: 93 additions & 0 deletions ydb/tests/fq/generic/utils/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from os import environ
from dataclasses import dataclass
from typing import Optional

import yatest.common

from ydb.tests.tools.docker_compose_helpers.endpoint_determiner import EndpointDeterminer


@dataclass
class Settings:
@dataclass
class Connector:
grpc_host: str
grpc_port: int

connector: Connector

@dataclass
class MdbMock:
endpoint: str

mdb_mock: MdbMock

@dataclass
class TokenAccessorMock:
endpoint: str
hmac_secret_file: str

token_accessor_mock: TokenAccessorMock

@dataclass
class ClickHouse:
cluster_name: str
dbname: str
username: str
password: str
host: str
http_port: int
native_port: int
protocol: str

clickhouse: ClickHouse

@dataclass
class PostgreSQL:
cluster_name: str
dbname: str
username: str
password: Optional[str]
host: str
port: int

postgresql: PostgreSQL

@classmethod
def from_env(cls) -> 'Settings':
docker_compose_file = yatest.common.source_path('ydb/tests/fq/generic/docker-compose.yml')
endpoint_determiner = EndpointDeterminer(docker_compose_file)

s = cls(
connector=cls.Connector(
grpc_host='localhost',
grpc_port=endpoint_determiner.get_port('connector', 50051),
),
mdb_mock=cls.MdbMock(
endpoint=environ['MDB_MOCK_ENDPOINT'],
),
token_accessor_mock=cls.TokenAccessorMock(
endpoint=environ['TOKEN_ACCESSOR_MOCK_ENDPOINT'],
hmac_secret_file=environ['TOKEN_ACCESSOR_HMAC_SECRET_FILE'],
),
clickhouse=cls.ClickHouse(
cluster_name='clickhouse_integration_test',
dbname='db',
host='localhost',
http_port=endpoint_determiner.get_port('clickhouse', 8123),
native_port=endpoint_determiner.get_port('clickhouse', 9000),
username='user',
password='password',
protocol='native',
),
postgresql=cls.PostgreSQL(
cluster_name='postgresql_integration_test',
dbname='db',
host='localhost',
port=endpoint_determiner.get_port('postgresql', 6432),
username='user',
password='password',
),
)

return s
Loading

0 comments on commit a573b0c

Please sign in to comment.