Skip to content

Commit

Permalink
YDB FQ: add Greenplum to federated JOIN test (#8649)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 authored Sep 3, 2024
1 parent be39b5a commit e396f16
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 35 deletions.
8 changes: 7 additions & 1 deletion ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,18 @@ void FillGenericClusterConfigBase(
FillClusterAuth(clusterCfg, connection.auth(), authToken, accountIdSignatures);
clusterCfg.SetUseSsl(!common.GetDisableSslForGenericDataSources());

// In YQv1 we just hardcode desired protocols here.
// In YQv1 we just hardcode the appropriate protocols here.
// In YQv2 protocol can be configured via `CREATE EXTERNAL DATA SOURCE` params.
switch (dataSourceKind) {
case NYql::NConnector::NApi::CLICKHOUSE:
clusterCfg.SetProtocol(common.GetUseNativeProtocolForClickHouse() ? NYql::NConnector::NApi::EProtocol::NATIVE : NYql::NConnector::NApi::EProtocol::HTTP);
break;
case NYql::NConnector::NApi::GREENPLUM:
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
break;
case NYql::NConnector::NApi::MYSQL:
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
break;
case NYql::NConnector::NApi::POSTGRESQL:
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, docker_compose_yml_path: os.PathLike):
self.docker_compose_yml_path = docker_compose_yml_path

with open(self.docker_compose_yml_path) as f:
self.docker_compose_yml_data = yaml.load(f, Loader=yaml.FullLoader)
self.docker_compose_yml_data = yaml.load(f, Loader=yaml.SafeLoader)

def get_external_port(self, service_name: str, internal_port: int) -> int:
cmd = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,80 @@
from ydb.tests.fq.generic.utils.settings import Settings


class TestJoin:
class TestJoinAnalytics:
@yq_all
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("query_type", [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING])
def test_simple(self, fq_client: FederatedQueryClient, settings: Settings, query_type):
table_name = 'join_table'
ch_conn_name = f'ch_conn_{table_name}'
pg_conn_name = f'pg_conn_{table_name}'
ydb_conn_name = f'ydb_conn_{table_name}'
query_name = f'query_{table_name}'

fq_client.create_postgresql_connection(
name=pg_conn_name,
database_name=settings.postgresql.dbname,
database_id='postgresql_cluster_id',
login=settings.postgresql.username,
password=settings.postgresql.password,
)
table_name = "join_table"
ch_conn_name = f"ch_conn_{table_name}"
gp_conn_name = f"gp_conn_{table_name}"
pg_conn_name = f"pg_conn_{table_name}"
ydb_conn_name = f"ydb_conn_{table_name}"
query_name = f"query_{table_name}"

fq_client.create_clickhouse_connection(
name=ch_conn_name,
database_name=settings.clickhouse.dbname,
database_id='clickhouse_cluster_id',
database_id="clickhouse_cluster_id",
login=settings.clickhouse.username,
password=settings.clickhouse.password,
)

fq_client.create_greenplum_connection(
name=gp_conn_name,
database_name=settings.greenplum.dbname,
database_id="greenplum_cluster_id",
login=settings.greenplum.username,
password=settings.greenplum.password,
)

fq_client.create_postgresql_connection(
name=pg_conn_name,
database_name=settings.postgresql.dbname,
database_id="postgresql_cluster_id",
login=settings.postgresql.username,
password=settings.postgresql.password,
)

fq_client.create_ydb_connection(
name=ydb_conn_name,
database_id=settings.ydb.dbname,
)

# FIXME: research why test starts failing if we add Greenplum
sql = fR'''
SELECT pg.data AS data_pg, ch.data AS data_ch, ydb.data AS data_ydb
sql = Rf"""
SELECT pg.data AS data_pg, ch.data AS data_ch, ydb.data AS data_ydb, gp.data AS data_gp
FROM {pg_conn_name}.{table_name} AS pg
JOIN {ch_conn_name}.{table_name} AS ch
ON pg.id = ch.id
JOIN {ydb_conn_name}.{table_name} AS ydb
ON pg.id = ydb.id;
'''
ON pg.id = ydb.id
JOIN {gp_conn_name}.{table_name} AS gp
ON pg.id = gp.id;
"""

query_id = fq_client.create_query(query_name, sql, type=query_type).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = fq_client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 3
assert len(result_set.columns) == 4
assert result_set.columns[0].name == "data_pg"
assert result_set.columns[1].name == "data_ch"
assert result_set.columns[2].name == "data_ydb"
assert result_set.columns[3].name == "data_gp"
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].bytes_value == b'pg10'
assert result_set.rows[0].items[1].bytes_value == b'ch10'
assert result_set.rows[0].items[2].bytes_value == b'ydb10', result_set
assert result_set.rows[1].items[0].bytes_value == b'pg20'
assert result_set.rows[1].items[1].bytes_value == b'ch20'
assert result_set.rows[1].items[2].bytes_value == b'ydb20'
assert result_set.rows[2].items[0].bytes_value == b'pg30'
assert result_set.rows[2].items[1].bytes_value == b'ch30'
assert result_set.rows[2].items[2].bytes_value == b'ydb30'
assert result_set.rows[0].items[0].bytes_value == b"pg10"
assert result_set.rows[0].items[1].bytes_value == b"ch10"
assert result_set.rows[0].items[2].bytes_value == b"ydb10"
assert result_set.rows[0].items[3].bytes_value == b"gp10"
assert result_set.rows[1].items[0].bytes_value == b"pg20"
assert result_set.rows[1].items[1].bytes_value == b"ch20"
assert result_set.rows[1].items[2].bytes_value == b"ydb20"
assert result_set.rows[1].items[3].bytes_value == b"gp20"
assert result_set.rows[2].items[0].bytes_value == b"pg30"
assert result_set.rows[2].items[1].bytes_value == b"ch30"
assert result_set.rows[2].items[2].bytes_value == b"ydb30"
assert result_set.rows[2].items[3].bytes_value == b"gp30"
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@
]


class TestStreamingJoin(TestYdsBase):
class TestJoinStreaming(TestYdsBase):
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down
4 changes: 2 additions & 2 deletions ydb/tests/fq/generic/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ TEST_SRCS(
conftest.py
test_clickhouse.py
test_greenplum.py
test_join.py
test_postgresql.py
test_streaming_join.py
test_join_analytics.py
test_join_streaming.py
test_ydb.py
)

Expand Down

0 comments on commit e396f16

Please sign in to comment.