Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocess scenario test #13371

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/tests/olap/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def teardown_class(cls):
cls._ydb_instance.stop()

def test(self, ctx: TestContext):
test_path = ctx.test + get_external_param("table_suffix", "")
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)
start_time = time.time()
try:
ctx.executable(self, ctx)
Expand All @@ -103,6 +105,7 @@ def test(self, ctx: TestContext):
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
raise
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)


def pytest_generate_tests(metafunc):
Expand Down
21 changes: 11 additions & 10 deletions ydb/tests/olap/scenario/helpers/scenario_tests_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from abc import abstractmethod, ABC
from typing import Set, List, Dict, Any, Callable, Optional
from time import sleep
from ydb.tests.olap.lib.utils import get_external_param


class TestContext:
Expand Down Expand Up @@ -316,7 +317,7 @@ def _add_not_empty(p: str, dir: str):
result = os.path.join('/', YdbCluster.ydb_database, YdbCluster.tables_path)
if self.test_context is not None:
result = _add_not_empty(result, self.test_context.suite)
result = _add_not_empty(result, self.test_context.test)
result = _add_not_empty(result, self.test_context.test) + get_external_param("table_suffix", "")
result = _add_not_empty(result, path)
return result

Expand Down Expand Up @@ -463,7 +464,7 @@ def execute_scan_query(

@allure.step('Execute query')
def execute_query(
self, yql: str, expected_status: ydb.StatusCode | Set[ydb.StatusCode] = ydb.StatusCode.SUCCESS
self, yql: str, expected_status: ydb.StatusCode | Set[ydb.StatusCode] = ydb.StatusCode.SUCCESS, retries=0
):
"""Run a query on the tested database.

Expand All @@ -479,7 +480,7 @@ def execute_query(

allure.attach(yql, 'request', allure.attachment_type.TEXT)
with ydb.QuerySessionPool(YdbCluster.get_ydb_driver()) as pool:
self._run_with_expected_status(lambda: pool.execute_with_retries(yql), expected_status)
self._run_with_expected_status(lambda: pool.execute_with_retries(yql, None, ydb.RetrySettings(max_retries=retries)), expected_status)

def drop_if_exist(self, names: List[str], operation) -> None:
"""Erase entities in the tested database, if it exists.
Expand Down Expand Up @@ -653,7 +654,7 @@ def describe_table(self, path: str, settings: ydb.DescribeTableSettings = None)
)

@allure.step('List path {path}')
def list_path(self, path: str) -> List[ydb.SchemeEntry]:
def list_path(self, path: str, folder: str) -> List[ydb.SchemeEntry]:
"""Recursively describe the path in the database under test.

If the path is a directory or TableStore, then all subpaths are included in the description.
Expand All @@ -666,7 +667,7 @@ def list_path(self, path: str) -> List[ydb.SchemeEntry]:
If the path does not exist, an empty list is returned.
"""

root_path = self.get_full_path('')
root_path = self.get_full_path(folder)
try:
self_descr = YdbCluster._describe_path_impl(os.path.join(root_path, path))
except ydb.issues.SchemeError:
Expand All @@ -681,7 +682,7 @@ def list_path(self, path: str) -> List[ydb.SchemeEntry]:
return self_descr

@allure.step('Remove path {path}')
def remove_path(self, path: str) -> None:
def remove_path(self, path: str, folder: str = '') -> None:
"""Recursively delete a path in the tested database.

If the path is a directory or TableStore, then all nested paths are removed.
Expand All @@ -696,12 +697,12 @@ def remove_path(self, path: str) -> None:

import ydb.tests.olap.scenario.helpers.drop_helper as dh

root_path = self.get_full_path('')
for e in self.list_path(path):
root_path = self.get_full_path(folder)
for e in self.list_path(path, folder):
if e.is_any_table():
self.execute_scheme_query(dh.DropTable(e.name))
self.execute_scheme_query(dh.DropTable(os.path.join(folder, e.name)))
elif e.is_column_store():
self.execute_scheme_query(dh.DropTableStore(e.name))
self.execute_scheme_query(dh.DropTableStore(os.path.join(folder, e.name)))
elif e.is_directory():
self._run_with_expected_status(
lambda: YdbCluster.get_ydb_driver().scheme_client.remove_directory(os.path.join(root_path, e.name)),
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/olap/scenario/multitest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh -e
make S3_ACCESS_KEY=$1 S3_SECRET_KEY=$2 YDB_ENDPOINT=$3 YDB_DB=$4 -rkj -f test.mk all.test.dst && echo OK || echo Error
6 changes: 6 additions & 0 deletions ydb/tests/olap/scenario/test.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
suffixes:=$(shell jot 20)

$(suffixes:=.test.dst): %.test.dst:
../../../../ya test --build=relwithdebinfo --test-disable-timeout --test-param ydb-endpoint=$(YDB_ENDPOINT) --test-param ydb-db=$(YDB_DB) --test-param tables-path=scenario --test-param s3-endpoint=http://storage.yandexcloud.net --test-param s3-access-key=$(S3_ACCESS_KEY) --test-param s3-secret-key=$(S3_SECRET_KEY) --test-param s3-buckets=ydb-test-test,ydb-test-test-2 --test-param test-duration-seconds=2400 --test-param table_suffix=$* --test-param rows_count=100 --test-param batches_count=1000 --test-param reuse-tables=True --test-param keep-tables=True --test-param tables_count=10 --test-param ignore_read_errors=True -F test_insert.py::TestInsert::test[read_data_during_bulk_upsert]

all.test.dst: $(suffixes:=.test.dst)
98 changes: 61 additions & 37 deletions ydb/tests/olap/scenario/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from helpers.thread_helper import TestThread
from ydb import PrimitiveType
from typing import List, Dict, Any
from ydb.tests.olap.lib.utils import get_external_param
from ydb.tests.olap.lib.utils import get_external_param, external_param_is_true


class TestInsert(BaseTestSet):
Expand All @@ -24,19 +24,26 @@ class TestInsert(BaseTestSet):
.with_key_columns("key")
)

def _loop_upsert(self, ctx: TestContext, data: list):
def _loop_upsert(self, ctx: TestContext, data: list, table: str):
sth = ScenarioTestHelper(ctx)
table_name = "log" + table
for batch in data:
sth.bulk_upsert_data("log", self.schema_log, batch)
sth.bulk_upsert_data(table_name, self.schema_log, batch)

def _loop_insert(self, ctx: TestContext, rows_count: int):
def _loop_insert(self, ctx: TestContext, rows_count: int, table: str, ignore_read_errors: bool):
sth = ScenarioTestHelper(ctx)
log: str = sth.get_full_path("log")
cnt: str = sth.get_full_path("cnt")
log: str = sth.get_full_path("log" + table)
cnt: str = sth.get_full_path("cnt" + table)
for i in range(rows_count):
sth.execute_query(
f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)'
)
try:
sth.execute_query(
yql=f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)', retries=10
)
except Exception:
if ignore_read_errors:
pass
else:
raise

def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
sth = ScenarioTestHelper(ctx)
Expand All @@ -45,42 +52,59 @@ def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
batches_count = int(get_external_param("batches_count", "10"))
rows_count = int(get_external_param("rows_count", "1000"))
inserts_count = int(get_external_param("inserts_count", "200"))
sth.execute_scheme_query(
CreateTable(cnt_table_name).with_schema(self.schema_cnt)
)
sth.execute_scheme_query(
CreateTable(log_table_name).with_schema(self.schema_log)
)
tables_count = int(get_external_param("tables_count", "1"))
ignore_read_errors = external_param_is_true("ignore_read_errors")
for table in range(tables_count):
sth.execute_scheme_query(
CreateTable(cnt_table_name + str(table)).with_schema(self.schema_cnt)
)
for table in range(tables_count):
sth.execute_scheme_query(
CreateTable(log_table_name + str(table)).with_schema(self.schema_log)
)
data: List = []
for i in range(batches_count):
batch: List[Dict[str, Any]] = []
for j in range(rows_count):
batch.append({"key": j + rows_count * i})
data.append(batch)

thread1 = TestThread(target=self._loop_upsert, args=[ctx, data])
thread2 = TestThread(target=self._loop_insert, args=[ctx, inserts_count])
thread1 = []
thread2 = []
for table in range(tables_count):
thread1.append(TestThread(target=self._loop_upsert, args=[ctx, data, str(table)]))
for table in range(tables_count):
thread2.append(TestThread(target=self._loop_insert, args=[ctx, inserts_count, str(table), ignore_read_errors]))

for thread in thread1:
thread.start()

thread1.start()
thread2.start()
for thread in thread2:
thread.start()

thread2.join()
thread1.join()
for thread in thread2:
thread.join()

rows: int = sth.get_table_rows_count(cnt_table_name)
assert rows == inserts_count
scan_result = sth.execute_scan_query(
f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"
for thread in thread1:
thread.join()

rows: int = sth.get_table_rows_count(log_table_name)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(
f"SELECT key FROM `{sth.get_full_path(log_table_name)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"
for table in range(tables_count):
cnt_table_name0 = cnt_table_name + str(table)
rows: int = sth.get_table_rows_count(cnt_table_name0)
assert rows == inserts_count
scan_result = sth.execute_scan_query(
f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name0)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

log_table_name0 = log_table_name + str(table)
rows: int = sth.get_table_rows_count(log_table_name0)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(
f"SELECT key FROM `{sth.get_full_path(log_table_name0)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"
Loading