diff --git a/ydb/tests/olap/ttl_tiering/base.py b/ydb/tests/olap/ttl_tiering/base.py new file mode 100644 index 000000000000..3685c1c436af --- /dev/null +++ b/ydb/tests/olap/ttl_tiering/base.py @@ -0,0 +1,164 @@ +import yatest.common +import os +import time +import ydb +import logging +import boto3 +import requests +from library.recipes import common as recipes_common + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator + + +logger = logging.getLogger(__name__) + + +class S3Client: + def __init__(self, endpoint, region, key_id, key_secret): + self.endpoint = endpoint + self.region = region + self.key_id = key_id + self.key_secret = key_secret + + session = boto3.session.Session() + self.s3 = session.resource( + service_name="s3", + aws_access_key_id=key_id, + aws_secret_access_key=key_secret, + region_name=region, + endpoint_url=endpoint + ) + self.client = session.client( + service_name="s3", + aws_access_key_id=key_id, + aws_secret_access_key=key_secret, + region_name=region, + endpoint_url=endpoint + ) + + def create_bucket(self, name: str): + self.client.create_bucket(Bucket=name) + + def get_bucket_stat(self, bucket_name: str) -> (int, int): + bucket = self.s3.Bucket(bucket_name) + count = 0 + size = 0 + for obj in bucket.objects.all(): + count += 1 + size += obj.size + return (count, size) + + +class YdbClient: + def __init__(self, endpoint, database): + self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None) + self.database = database + self.session_pool = ydb.QuerySessionPool(self.driver) + + def stop(self): + self.session_pool.stop() + self.driver.stop() + + def wait_connection(self, timeout=5): + self.driver.wait(timeout, fail_fast=True) + + def query(self, statement): + return self.session_pool.execute_with_retries(statement) + + +class ColumnTableHelper: + def __init__(self, ydb_client: YdbClient, path: str): + self.ydb_client = ydb_client + self.path = path + + def get_row_count(self) -> int: + return self.ydb_client.query(f"select count(*) as Rows from `{self.path}`")[0].rows[0]["Rows"] + + def get_portion_count(self) -> int: + return self.ydb_client.query(f"select count(*) as Rows from `{self.path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"] + + def get_portion_stat_by_tier(self) -> dict[str, dict[str, int]]: + results = self.ydb_client.query(f"select TierName, sum(Rows) as Rows, count(*) as Portions from `{self.path}/.sys/primary_index_portion_stats` group by TierName") + return {row["TierName"]: {"Rows": row["Rows"], "Portions": row["Portions"]} for result_set in results for row in result_set.rows} + + def get_blob_stat_by_tier(self) -> dict[str, (int, int)]: + stmt = f""" + select TierName, count(*) as Portions, sum(BlobSize) as BlobSize, sum(BlobCount) as BlobCount from ( + select TabletId, PortionId, TierName, sum(BlobRangeSize) as BlobSize, count(*) as BlobCount from `{self.path}/.sys/primary_index_stats` group by TabletId, PortionId, TierName + ) group by TierName + """ + results = self.ydb_client.query(stmt) + return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows} + + +class TllTieringTestBase(object): + @classmethod + def setup_class(cls): + cls._setup_ydb() + cls._setup_s3() + + @classmethod + def teardown_class(cls): + recipes_common.stop_daemon(cls.s3_pid) + cls.ydb_client.stop() + cls.cluster.stop() + + @classmethod + def _setup_ydb(cls): + ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) + logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) + config = KikimrConfigGenerator( + extra_feature_flags={ + "enable_external_data_sources": True, + "enable_tiering_in_column_shard": True + }, + column_shard_config={ + "lag_for_compaction_before_tierings_ms": 0, + "compaction_actualization_lag_ms": 0, + "optimizer_freshness_check_duration_ms": 0, + "small_portion_detect_size_limit": 0, + } + ) + cls.cluster = KiKiMR(config) + cls.cluster.start() + node = cls.cluster.nodes[1] + cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}") + cls.ydb_client.wait_connection() + + @classmethod + def _setup_s3(cls): + s3_pid_file = "s3.pid" + moto_server_path = os.environ["MOTO_SERVER_PATH"] + + port_manager = yatest.common.network.PortManager() + port = port_manager.get_port() + endpoint = f"http://localhost:{port}" + command = [yatest.common.binary_path(moto_server_path), "s3", "--port", str(port)] + + def is_s3_ready(): + try: + response = requests.get(endpoint) + response.raise_for_status() + return True + except requests.RequestException as err: + logging.debug(err) + return False + + recipes_common.start_daemon( + command=command, environment=None, is_alive_check=is_s3_ready, pid_file_name=s3_pid_file + ) + + with open(s3_pid_file, 'r') as f: + cls.s3_pid = int(f.read()) + + cls.s3_client = S3Client(endpoint, "us-east-1", "fake_key_id", "fake_key_secret") + + @staticmethod + def wait_for(condition_func, timeout_seconds): + t0 = time.time() + while time.time() - t0 < timeout_seconds: + if condition_func(): + return True + time.sleep(1) + return False diff --git a/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py b/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py new file mode 100644 index 000000000000..56c28b8a2f83 --- /dev/null +++ b/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py @@ -0,0 +1,163 @@ +import time +import logging +from .base import TllTieringTestBase, ColumnTableHelper + +logger = logging.getLogger(__name__) + + +class TestDeleteS3Ttl(TllTieringTestBase): + ''' Implements https://github.com/ydb-platform/ydb/issues/13467 ''' + + test_name = "delete_s3_ttl" + row_count = 10 ** 7 + single_upsert_row_count = 10 ** 6 + cold_bucket = "cold" + frozen_bucket = "frozen" + days_to_cool = 1000 + days_to_freeze = 3000 + + @classmethod + def setup_class(cls): + super(TestDeleteS3Ttl, cls).setup_class() + cls.s3_client.create_bucket(cls.cold_bucket) + cls.s3_client.create_bucket(cls.frozen_bucket) + + def get_row_count_by_date(self, table_path: str, past_days: int) -> int: + return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"] + + def test(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + secret_prefix = self.test_name + access_key_id_secret_name = f"{secret_prefix}_key_id" + access_key_secret_secret_name = f"{secret_prefix}_key_secret" + cold_eds_path = f"{test_dir}/{self.cold_bucket}" + frozen_eds_path = f"{test_dir}/{self.frozen_bucket}" + + # Expect empty buckets to avoid unintentional data deletion/modification + if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0): + raise Exception("Bucket for cold data is not empty") + if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0): + raise Exception("Bucket for frozen data is not empty") + + self.ydb_client.query(f""" + CREATE TABLE `{table_path}` ( + ts Timestamp NOT NULL, + s String, + val Uint64, + PRIMARY KEY(ts), + ) + WITH (STORE = COLUMN) + """ + ) + + logger.info(f"Table {table_path} created") + + self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'") + self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'") + + self.ydb_client.query(f""" + CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", + AWS_REGION="{self.s3_client.region}" + ) + """) + + self.ydb_client.query(f""" + CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{self.s3_client.endpoint}/{self.frozen_bucket}", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", + AWS_REGION="{self.s3_client.region}" + ) + """) + table = ColumnTableHelper(self.ydb_client, table_path) + + cur_rows = 0 + while cur_rows < self.row_count: + self.ydb_client.query(""" + $row_count = %i; + $from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64); + $to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64); + $dt = $to_us - $from_us; + $k = ((1ul << 64) - 1) / CAST($dt - 1 as Double); + $rows= ListMap(ListFromRange(0, $row_count), ($i)->{ + $us = CAST(RandomNumber($i) / $k as Uint64) + $from_us; + $ts = Unwrap(CAST($us as Timestamp)); + return <| + ts: $ts, + s: 'some date:' || CAST($ts as String), + val: $us + |>; + }); + upsert into `%s` + select * FROM AS_TABLE($rows); + """ % (min(self.row_count - cur_rows, self.single_upsert_row_count), table_path)) + cur_rows = table.get_row_count() + logger.info(f"{cur_rows} rows inserted in total, portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}") + + logger.info(f"Rows older than {self.days_to_cool} days: {self.get_row_count_by_date(table_path, self.days_to_cool)}") + logger.info(f"Rows older than {self.days_to_freeze} days: {self.get_row_count_by_date(table_path, self.days_to_freeze)}") + + def portions_actualized_in_sys(): + portions = table.get_portion_stat_by_tier() + logger.info(f"portions: {portions}, blobs: {table.get_blob_stat_by_tier()}") + if len(portions) != 1 or "__DEFAULT" not in portions: + raise Exception("Data not in __DEFAULT teir") + return self.row_count <= portions["__DEFAULT"]["Rows"] + + if not self.wait_for(lambda: portions_actualized_in_sys(), 120): + raise Exception(".sys reports incorrect data portions") + + t0 = time.time() + stmt = f""" + ALTER TABLE `{table_path}` SET (TTL = + Interval("P{self.days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`, + Interval("P{self.days_to_freeze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}` + ON ts + ) + """ + logger.info(stmt) + self.ydb_client.query(stmt) + logger.info(f"TTL set in {time.time() - t0} seconds") + + def data_distributes_across_tiers(): + cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket) + frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket) + logger.info(f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}") + # TODO FIXME + # We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525 + # So we wait until some data appears in any bucket + return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0 + + if not self.wait_for(lambda: data_distributes_across_tiers(), 600): + raise Exception("Data eviction has not been started") + + t0 = time.time() + stmt = f""" + ALTER TABLE `{table_path}` SET (TTL = + Interval("P{self.days_to_cool}D") + ON ts + ) + """ + logger.info(stmt) + self.ydb_client.query(stmt) + logger.info(f"TTL set in {time.time() - t0} seconds") + + # TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523 + def data_deleted_from_buckets(): + cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket) + frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket) + logger.info( + f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}") + return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0 + + if not self.wait_for(lambda: data_deleted_from_buckets(), 120): + # raise Exception("not all data deleted") TODO FIXME after https://github.com/ydb-platform/ydb/issues/13535 + pass diff --git a/ydb/tests/olap/ttl_tiering/ya.make b/ydb/tests/olap/ttl_tiering/ya.make new file mode 100644 index 000000000000..a871dbc73603 --- /dev/null +++ b/ydb/tests/olap/ttl_tiering/ya.make @@ -0,0 +1,27 @@ +PY3TEST() +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") +ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server") +ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG") + +TEST_SRCS( + base.py + ttl_delete_s3.py +) + +SIZE(MEDIUM) + +PEERDIR( + ydb/tests/library + ydb/public/sdk/python + ydb/public/sdk/python/enable_v3_new_behavior + contrib/python/boto3 + library/recipes/common +) + +DEPENDS( + ydb/apps/ydbd + contrib/python/moto/bin +) + +END() + diff --git a/ydb/tests/olap/ya.make b/ydb/tests/olap/ya.make index 452f348f3f75..f827ee5609b7 100644 --- a/ydb/tests/olap/ya.make +++ b/ydb/tests/olap/ya.make @@ -3,4 +3,5 @@ RECURSE( scenario docs load + ttl_tiering )