Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test deleting from s3 by ttl #13533

Merged
merged 6 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions ydb/tests/olap/ttl_tiering/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import yatest.common
import os
import time
import ydb
import logging
import boto3
import requests
from library.recipes import common as recipes_common

from ydb.tests.library.harness.kikimr_runner import KiKiMR
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator


logger = logging.getLogger(__name__)


class S3Client:
def __init__(self, endpoint, region, key_id, key_secret):
self.endpoint = endpoint
self.region = region
self.key_id = key_id
self.key_secret = key_secret

session = boto3.session.Session()
self.s3 = session.resource(
service_name="s3",
aws_access_key_id=key_id,
aws_secret_access_key=key_secret,
region_name=region,
endpoint_url=endpoint
)
self.client = session.client(
service_name="s3",
aws_access_key_id=key_id,
aws_secret_access_key=key_secret,
region_name=region,
endpoint_url=endpoint
)

def create_bucket(self, name: str):
self.client.create_bucket(Bucket=name)

def get_bucket_stat(self, bucket_name: str) -> (int, int):
bucket = self.s3.Bucket(bucket_name)
count = 0
size = 0
for obj in bucket.objects.all():
count += 1
size += obj.size
return (count, size)


class YdbClient:
def __init__(self, endpoint, database):
self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
self.database = database
self.session_pool = ydb.QuerySessionPool(self.driver)

def stop(self):
self.session_pool.stop()
self.driver.stop()

def wait_connection(self, timeout=5):
self.driver.wait(timeout, fail_fast=True)

def query(self, statement):
return self.session_pool.execute_with_retries(statement)


class ColumnTableHelper:
def __init__(self, ydb_client: YdbClient, path: str):
self.ydb_client = ydb_client
self.path = path

def get_row_count(self) -> int:
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}`")[0].rows[0]["Rows"]

def get_portion_count(self) -> int:
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"]

def get_portion_stat_by_tier(self) -> dict[str, dict[str, int]]:
results = self.ydb_client.query(f"select TierName, sum(Rows) as Rows, count(*) as Portions from `{self.path}/.sys/primary_index_portion_stats` group by TierName")
return {row["TierName"]: {"Rows": row["Rows"], "Portions": row["Portions"]} for result_set in results for row in result_set.rows}

def get_blob_stat_by_tier(self) -> dict[str, (int, int)]:
stmt = f"""
select TierName, count(*) as Portions, sum(BlobSize) as BlobSize, sum(BlobCount) as BlobCount from (
select TabletId, PortionId, TierName, sum(BlobRangeSize) as BlobSize, count(*) as BlobCount from `{self.path}/.sys/primary_index_stats` group by TabletId, PortionId, TierName
) group by TierName
"""
results = self.ydb_client.query(stmt)
return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows}


class TllTieringTestBase(object):
@classmethod
def setup_class(cls):
cls._setup_ydb()
cls._setup_s3()

@classmethod
def teardown_class(cls):
recipes_common.stop_daemon(cls.s3_pid)
cls.ydb_client.stop()
cls.cluster.stop()

@classmethod
def _setup_ydb(cls):
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
config = KikimrConfigGenerator(
extra_feature_flags={
"enable_external_data_sources": True,
"enable_tiering_in_column_shard": True
},
column_shard_config={
"lag_for_compaction_before_tierings_ms": 0,
"compaction_actualization_lag_ms": 0,
"optimizer_freshness_check_duration_ms": 0,
"small_portion_detect_size_limit": 0,
}
)
cls.cluster = KiKiMR(config)
cls.cluster.start()
node = cls.cluster.nodes[1]
cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
cls.ydb_client.wait_connection()

@classmethod
def _setup_s3(cls):
s3_pid_file = "s3.pid"
moto_server_path = os.environ["MOTO_SERVER_PATH"]

port_manager = yatest.common.network.PortManager()
port = port_manager.get_port()
endpoint = f"http://localhost:{port}"
command = [yatest.common.binary_path(moto_server_path), "s3", "--port", str(port)]

def is_s3_ready():
try:
response = requests.get(endpoint)
response.raise_for_status()
return True
except requests.RequestException as err:
logging.debug(err)
return False

recipes_common.start_daemon(
command=command, environment=None, is_alive_check=is_s3_ready, pid_file_name=s3_pid_file
)

with open(s3_pid_file, 'r') as f:
cls.s3_pid = int(f.read())

cls.s3_client = S3Client(endpoint, "us-east-1", "fake_key_id", "fake_key_secret")

@staticmethod
def wait_for(condition_func, timeout_seconds):
t0 = time.time()
while time.time() - t0 < timeout_seconds:
if condition_func():
return True
time.sleep(1)
return False
163 changes: 163 additions & 0 deletions ydb/tests/olap/ttl_tiering/ttl_delete_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import time
import logging
from .base import TllTieringTestBase, ColumnTableHelper

logger = logging.getLogger(__name__)


class TestDeleteS3Ttl(TllTieringTestBase):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я вот не уверен до конца что имеет смысл разбивать на два классе TllTieringTestBase/TestDeleteS3Ttl но смотри сам

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

предполагается переиспользование другими тестами тиринга

''' Implements https://github.com/ydb-platform/ydb/issues/13467 '''

test_name = "delete_s3_ttl"
row_count = 10 ** 7
single_upsert_row_count = 10 ** 6
cold_bucket = "cold"
frozen_bucket = "frozen"
days_to_cool = 1000
days_to_freeze = 3000

@classmethod
def setup_class(cls):
super(TestDeleteS3Ttl, cls).setup_class()
cls.s3_client.create_bucket(cls.cold_bucket)
cls.s3_client.create_bucket(cls.frozen_bucket)

def get_row_count_by_date(self, table_path: str, past_days: int) -> int:
return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"]

def test(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Может стоит написать комментарий про то что тест делает? Типа чтобы весь код не читать

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В докстринге к классу есть ссылка на описание

test_dir = f"{self.ydb_client.database}/{self.test_name}"
table_path = f"{test_dir}/table"
secret_prefix = self.test_name
access_key_id_secret_name = f"{secret_prefix}_key_id"
access_key_secret_secret_name = f"{secret_prefix}_key_secret"
cold_eds_path = f"{test_dir}/{self.cold_bucket}"
frozen_eds_path = f"{test_dir}/{self.frozen_bucket}"

# Expect empty buckets to avoid unintentional data deletion/modification
if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0):
raise Exception("Bucket for cold data is not empty")
if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0):
raise Exception("Bucket for frozen data is not empty")

self.ydb_client.query(f"""
CREATE TABLE `{table_path}` (
ts Timestamp NOT NULL,
s String,
val Uint64,
PRIMARY KEY(ts),
)
WITH (STORE = COLUMN)
"""
)

logger.info(f"Table {table_path} created")

self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'")
self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'")

self.ydb_client.query(f"""
CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
AWS_REGION="{self.s3_client.region}"
)
""")

self.ydb_client.query(f"""
CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{self.s3_client.endpoint}/{self.frozen_bucket}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
AWS_REGION="{self.s3_client.region}"
)
""")
table = ColumnTableHelper(self.ydb_client, table_path)

cur_rows = 0
while cur_rows < self.row_count:
self.ydb_client.query("""
$row_count = %i;
$from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64);
$to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64);
$dt = $to_us - $from_us;
$k = ((1ul << 64) - 1) / CAST($dt - 1 as Double);
$rows= ListMap(ListFromRange(0, $row_count), ($i)->{
$us = CAST(RandomNumber($i) / $k as Uint64) + $from_us;
$ts = Unwrap(CAST($us as Timestamp));
return <|
ts: $ts,
s: 'some date:' || CAST($ts as String),
val: $us
|>;
});
upsert into `%s`
select * FROM AS_TABLE($rows);
""" % (min(self.row_count - cur_rows, self.single_upsert_row_count), table_path))
cur_rows = table.get_row_count()
logger.info(f"{cur_rows} rows inserted in total, portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}")

logger.info(f"Rows older than {self.days_to_cool} days: {self.get_row_count_by_date(table_path, self.days_to_cool)}")
logger.info(f"Rows older than {self.days_to_freeze} days: {self.get_row_count_by_date(table_path, self.days_to_freeze)}")

def portions_actualized_in_sys():
portions = table.get_portion_stat_by_tier()
logger.info(f"portions: {portions}, blobs: {table.get_blob_stat_by_tier()}")
if len(portions) != 1 or "__DEFAULT" not in portions:
raise Exception("Data not in __DEFAULT teir")
return self.row_count <= portions["__DEFAULT"]["Rows"]

if not self.wait_for(lambda: portions_actualized_in_sys(), 120):
raise Exception(".sys reports incorrect data portions")

t0 = time.time()
stmt = f"""
ALTER TABLE `{table_path}` SET (TTL =
Interval("P{self.days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`,
Interval("P{self.days_to_freeze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}`
ON ts
)
"""
logger.info(stmt)
self.ydb_client.query(stmt)
logger.info(f"TTL set in {time.time() - t0} seconds")

def data_distributes_across_tiers():
cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket)
frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket)
logger.info(f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
# TODO FIXME
# We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525
# So we wait until some data appears in any bucket
return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0

if not self.wait_for(lambda: data_distributes_across_tiers(), 600):
raise Exception("Data eviction has not been started")

t0 = time.time()
stmt = f"""
ALTER TABLE `{table_path}` SET (TTL =
Interval("P{self.days_to_cool}D")
ON ts
)
"""
logger.info(stmt)
self.ydb_client.query(stmt)
logger.info(f"TTL set in {time.time() - t0} seconds")

# TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523
def data_deleted_from_buckets():
cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket)
frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket)
logger.info(
f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0

if not self.wait_for(lambda: data_deleted_from_buckets(), 120):
# raise Exception("not all data deleted") TODO FIXME after https://github.com/ydb-platform/ydb/issues/13535
pass
27 changes: 27 additions & 0 deletions ydb/tests/olap/ttl_tiering/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
PY3TEST()
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server")
ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG")

TEST_SRCS(
base.py
ttl_delete_s3.py
)

SIZE(MEDIUM)

PEERDIR(
ydb/tests/library
ydb/public/sdk/python
ydb/public/sdk/python/enable_v3_new_behavior
contrib/python/boto3
library/recipes/common
)

DEPENDS(
ydb/apps/ydbd
contrib/python/moto/bin
)

END()

1 change: 1 addition & 0 deletions ydb/tests/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ RECURSE(
scenario
docs
load
ttl_tiering
)
Loading