Skip to content

Commit

Permalink
Merge branch 'ydb-platform:main' into YQ-2356.connector_integration_t…
Browse files Browse the repository at this point in the history
…ests
  • Loading branch information
tsmax2004 authored Jan 17, 2024
2 parents 7a68469 + a40932c commit 8cc7fdb
Show file tree
Hide file tree
Showing 28 changed files with 558 additions and 114 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail
ydb/core/tx/columnshard/engines/ut *
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
ydb/library/yql/providers/generic/connector/tests test.py.test_select_positive_postgresql*
ydb/library/yql/sql/pg/ut PgSqlParsingAutoparam.AutoParamValues_DifferentTypes
ydb/library/yql/tests/sql/dq_file/part16 test.py.test[expr-as_dict_list_key-default.txt-Analyze]
ydb/library/yql/tests/sql/dq_file/part18 test.py.test[expr-cast_type_bind-default.txt-Analyze]
Expand Down
1 change: 0 additions & 1 deletion ydb/core/base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ PEERDIR(
ydb/core/graph/api
ydb/core/protos
ydb/core/protos/out
ydb/core/scheme
ydb/library/aclib
ydb/library/login
ydb/library/pdisk_io
Expand Down
84 changes: 24 additions & 60 deletions ydb/library/actors/http/http_proxy_sock_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,82 +192,46 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers {

void Flush() {}

ssize_t Send(const void* data, size_t size, bool& read, bool& write) {
ssize_t res = SSL_write(Ssl.Get(), data, size);
if (res < 0) {
res = SSL_get_error(Ssl.Get(), res);
switch(res) {
case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
case SSL_ERROR_WANT_WRITE:
write = true;
return -EAGAIN;
default:
return -EIO;
}
int ProcessSslResult(const int res, bool& read, bool& write) {
int err = SSL_get_error(Ssl.Get(), res); // SSL_get_error() must be used after each SSL_* operation
switch(err) {
case SSL_ERROR_NONE:
return res;
case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
case SSL_ERROR_WANT_WRITE:
write = true;
return -EAGAIN;
default:
return -EIO;
}
return res;
}

ssize_t Send(const void* data, size_t size, bool& read, bool& write) {
ERR_clear_error();
return ProcessSslResult(SSL_write(Ssl.Get(), data, size), read, write);
}

ssize_t Recv(void* data, size_t size, bool& read, bool& write) {
ssize_t res = SSL_read(Ssl.Get(), data, size);
if (res < 0) {
res = SSL_get_error(Ssl.Get(), res);
switch(res) {
case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
case SSL_ERROR_WANT_WRITE:
write = true;
return -EAGAIN;
default:
return -EIO;
}
}
return res;
ERR_clear_error();
return ProcessSslResult(SSL_read(Ssl.Get(), data, size), read, write);
}

int OnConnect(bool& read, bool& write) {
if (!Ssl) {
InitClientSsl();
}
int res = SSL_connect(Ssl.Get());
if (res <= 0) {
res = SSL_get_error(Ssl.Get(), res);
switch(res) {
case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
case SSL_ERROR_WANT_WRITE:
write = true;
return -EAGAIN;
default:
return -EIO;
}
}
return res;
ERR_clear_error();
return ProcessSslResult(SSL_connect(Ssl.Get()), read, write);
}

int OnAccept(std::shared_ptr<TPrivateEndpointInfo> endpoint, bool& read, bool& write) {
if (!Ssl) {
InitServerSsl(endpoint->SecureContext.Get());
}
int res = SSL_accept(Ssl.Get());
if (res <= 0) {
res = SSL_get_error(Ssl.Get(), res);
switch(res) {
case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
case SSL_ERROR_WANT_WRITE:
write = true;
return -EAGAIN;
default:
return -EIO;
}
}
return res;
ERR_clear_error();
return ProcessSslResult(SSL_accept(Ssl.Get()), read, write);
}
};

Expand Down
29 changes: 29 additions & 0 deletions ydb/library/yql/core/pg_settings/guc_settings.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "guc_settings.h"

void TGUCSettings::Setup(const std::unordered_map<std::string, std::string>& runtimeSettings) {
RollbackSettings_ = runtimeSettings;
RollBack();
}

std::optional<std::string> TGUCSettings::Get(const std::string& key) const {
auto it = Settings_.find(key);
if (it == Settings_.end()) {
return std::nullopt;
}
return it->second;
}

void TGUCSettings::Set(const std::string& key, const std::string& val, bool isLocal) {
Settings_[key] = val;
if (!isLocal) {
SessionSettings_[key] = val;
}
}

void TGUCSettings::Commit() {
RollbackSettings_ = SessionSettings_;
}

void TGUCSettings::RollBack() {
Settings_ = SessionSettings_ = RollbackSettings_;
}
20 changes: 20 additions & 0 deletions ydb/library/yql/core/pg_settings/guc_settings.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once
#include <unordered_map>
#include <vector>
#include <string>
#include <optional>
#include <memory>

class TGUCSettings {
public:
using TPtr = std::shared_ptr<TGUCSettings>;
void Setup(const std::unordered_map<std::string, std::string>& runtimeSettings);
std::optional<std::string> Get(const std::string&) const;
void Set(const std::string&, const std::string&, bool isLocal = false);
void Commit();
void RollBack();
private:
std::unordered_map<std::string, std::string> Settings_;
std::unordered_map<std::string, std::string> RollbackSettings_;
std::unordered_map<std::string, std::string> SessionSettings_;
};
7 changes: 7 additions & 0 deletions ydb/library/yql/core/pg_settings/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
LIBRARY()

SRCS(
guc_settings.cpp
)

END()
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def settings() -> Settings:

@pytest.fixture
def clickhouse_client(settings) -> utils.clickhouse.Client:
return utils.clickhouse.make_client(settings.clickhouse)
client = utils.clickhouse.make_client(settings.clickhouse)
yield client
client.close()


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ version: '3.4'
services:
postgresql:
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
container_name: ${USER}_connector-integration-tests-postgresql
environment:
POSTGRES_DB: db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
ports:
- 15432:5432
- 5432
clickhouse:
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
container_name: ${USER}_connector-integration-tests-clickhouse
environment:
CLICKHOUSE_DB: db
CLICKHOUSE_USER: user
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: password
ports:
- 19000:9000
- 18123:8123
- 9000
- 8123
fq-connector-go:
container_name: ${USER}_connector-integration-tests-fq-connector-go
image: ghcr.io/ydb-platform/fq-connector-go:v0.1.1@sha256:47e24df143aee31a83d4a4cd0acc20b4cab8c03a9c63e81a6e99cb017a31f916
ports:
- 50051:50051
network_mode: host
- 50051
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def make_client(s: Settings.ClickHouse) -> Client:
attempt += 1
try:
client = clickhouse_connect.get_client(
host=s.host, port=s.http_port, username=s.username, password=s.password
host=s.host_external, port=s.http_port_external, username=s.username, password=s.password
)
except Exception as e:
sys.stderr.write(f"attempt #{attempt}: {e}")
sys.stderr.write(f"attempt #{attempt}: {e}\n")
time.sleep(5)
continue

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
import subprocess

import yatest.common


class EndpointDeterminer:
docker_compose_bin: os.PathLike
docker_compose_yml: os.PathLike

def __init__(self, docker_compose_yml: os.PathLike):
self.docker_compose_bin = yatest.common.build_path('library/recipes/docker_compose/bin/docker-compose')
self.docker_compose_yml = docker_compose_yml

def get_port(self, service_name: str, internal_port: int) -> int:
cmd = [self.docker_compose_bin, '-f', self.docker_compose_yml, 'port', service_name, str(internal_port)]
try:
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
external_port = int(out.split(b':')[1])
return external_port
except subprocess.CalledProcessError as e:
raise RuntimeError(f"docker-compose error: {e.output} (code {e.returncode})")
10 changes: 5 additions & 5 deletions ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ class GatewaysConfRenderer:
{% for cluster in generic_settings.clickhouse_clusters %}
{% if cluster.protocol == EProtocol.NATIVE %}
{% set CLICKHOUSE_PORT = settings.clickhouse.native_port %}
{% set CLICKHOUSE_PORT = settings.clickhouse.native_port_internal %}
{% set CLICKHOUSE_PROTOCOL = NATIVE %}
{% elif cluster.protocol == EProtocol.HTTP %}
{% set CLICKHOUSE_PORT = settings.clickhouse.http_port %}
{% set CLICKHOUSE_PORT = settings.clickhouse.http_port_internal %}
{% set CLICKHOUSE_PROTOCOL = HTTP %}
{% endif %}
{{ data_source(
CLICKHOUSE,
settings.clickhouse.cluster_name,
settings.clickhouse.host,
settings.clickhouse.host_internal,
CLICKHOUSE_PORT,
settings.clickhouse.username,
settings.clickhouse.password,
Expand All @@ -90,8 +90,8 @@ class GatewaysConfRenderer:
{{ data_source(
POSTGRESQL,
settings.postgresql.cluster_name,
settings.postgresql.host,
settings.postgresql.port,
settings.postgresql.host_internal,
settings.postgresql.port_internal,
settings.postgresql.username,
settings.postgresql.password,
NATIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ class SchemeRenderer:
{% for cluster in generic_settings.clickhouse_clusters %}
{% if cluster.protocol == EProtocol.NATIVE %}
{% set CLICKHOUSE_PORT = settings.clickhouse.native_port %}
{% set CLICKHOUSE_PORT = settings.clickhouse.native_port_internal %}
{% set CLICKHOUSE_PROTOCOL = NATIVE %}
{% elif cluster.protocol == EProtocol.HTTP %}
{% set CLICKHOUSE_PORT = settings.clickhouse.http_port %}
{% set CLICKHOUSE_PORT = settings.clickhouse.http_port_internal %}
{% set CLICKHOUSE_PROTOCOL = HTTP %}
{% endif %}
{{ create_data_source(
CLICKHOUSE,
settings.clickhouse.cluster_name,
settings.clickhouse.host,
settings.clickhouse.host_internal,
CLICKHOUSE_PORT,
settings.clickhouse.username,
settings.clickhouse.password,
Expand All @@ -76,8 +76,8 @@ class SchemeRenderer:
{{ create_data_source(
POSTGRESQL,
settings.postgresql.cluster_name,
settings.postgresql.host,
settings.postgresql.port,
settings.postgresql.host_internal,
settings.postgresql.port_internal,
settings.postgresql.username,
settings.postgresql.password,
NATIVE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from contextlib import contextmanager
import abc
import time
from datetime import datetime
from typing import Tuple
import sys

import pg8000.dbapi

Expand All @@ -16,17 +20,39 @@ def __init__(self, settings: Settings.PostgreSQL):

@contextmanager
def get_cursor(self, dbname: str):
conn = pg8000.dbapi.Connection(
user=self.settings.username,
password=self.settings.password,
host=self.settings.host,
port=self.settings.port,
database=dbname,
)
conn.autocommit = True

cur = conn.cursor()
yield conn, cur
conn, cursor = self._make_cursor(dbname=dbname)
yield conn, cursor
cursor.close()
conn.close()

def _make_cursor(self, dbname: str) -> Tuple[pg8000.dbapi.Connection, pg8000.dbapi.Cursor]:
start = datetime.now()
attempt = 0

while (datetime.now() - start).total_seconds() < 10:
attempt += 1
try:
sys.stdout.write(
f"Trying to connect PostgreSQL: {self.settings.host_external}:{self.settings.port_external}\n"
)
conn = pg8000.dbapi.Connection(
user=self.settings.username,
password=self.settings.password,
host=self.settings.host_external,
port=self.settings.port_external,
database=dbname,
timeout=1,
)
conn.autocommit = True

cur = conn.cursor()
return conn, cur
except Exception as e:
sys.stderr.write(f"attempt #{attempt} failed: {e} {e.args}\n")
time.sleep(3)
continue

raise Exception(f"Failed to connect PostgreSQL in {attempt} attempt(s)")


class Type(abc.ABC):
Expand Down
Loading

0 comments on commit 8cc7fdb

Please sign in to comment.