Skip to content

Commit

Permalink
get_metering(): added async support (v2) (ydb-platform#3856)
Browse files Browse the repository at this point in the history
  • Loading branch information
alchizhevsky authored Apr 22, 2024
1 parent 7df3337 commit b56212c
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 93 deletions.
8 changes: 4 additions & 4 deletions ydb/tests/fq/plans/test_stats_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ def test_mode(self, kikimr, s3, client, stats_mode):
kikimr.control_plane.wait_bootstrap(1)
client.create_storage_connection("pb", "pbucket")

sql = R'''
sql = '''
insert into pb.`path/` with (format=csv_with_names)
select * from AS_TABLE([<|foo:1, bar:"xxx"u|>,<|foo:2, bar:"yyy"u|>]);
'''
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = R'''
sql = '''
insert into pb.`path/` with (format=csv_with_names)
select * from AS_TABLE([<|foo:3, bar:"xxx"u|>,<|foo:4, bar:"yyy"u|>]);
'''
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = R'''
sql = '''
select bar, count(foo) as foo_count, sum(foo) as foo_sum
from pb.`path/` with (format=csv_with_names, schema(
foo Int NOT NULL,
Expand All @@ -59,4 +59,4 @@ def test_mode(self, kikimr, s3, client, stats_mode):
assert len(result_set.rows) == 2
# assert result_set.rows[0].items[0].uint64_value == 1024 * 10
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
# assert sum(kikimr.control_plane.get_metering()) == 11
# assert sum(kikimr.control_plane.get_metering(1)) == 11
60 changes: 30 additions & 30 deletions ydb/tests/fq/s3/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ def test_insert(self, kikimr, s3, client, format, dataset_name, unique_prefix):
storage_connection_name = unique_prefix + "ibucket"
client.create_storage_connection(storage_connection_name, "insert_bucket")

sql = R'''
insert into `{}`.`{}/` with (format={})
sql = f'''
insert into `{storage_connection_name}`.`{dataset_name}/` with (format={format})
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
'''.format(storage_connection_name, dataset_name, format)
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
prefix = client.describe_query(query_id).result.query.meta.last_job_id.split("-")[0] # cut _<query_id> part

sql = R'''
select foo, bar from {0}.`{1}/{3}*` with (format={2}, schema(
sql = f'''
select foo, bar from {storage_connection_name}.`{dataset_name}/{prefix}*` with (format={format}, schema(
foo Int NOT NULL,
bar String NOT NULL
))
'''.format(storage_connection_name, dataset_name, format, prefix)
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
Expand All @@ -70,7 +70,7 @@ def test_insert(self, kikimr, s3, client, format, dataset_name, unique_prefix):
assert result_set.rows[0].items[1].bytes_value == b'xxx'
assert result_set.rows[1].items[0].int32_value == 456
assert result_set.rows[1].items[1].bytes_value == b'yyy'
assert sum(kikimr.control_plane.get_metering()) == 20
assert sum(kikimr.control_plane.get_metering(1)) == 20

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand All @@ -93,7 +93,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
aws_secret_access_key="secret_key"
)

taxi = R'''VendorID'''
taxi = '''VendorID'''
for i in range(37):
taxi += "\n" + str(i)
s3_client.put_object(Body=taxi, Bucket='big_data_bucket', Key='src/taxi.csv', ContentType='text/plain')
Expand All @@ -119,7 +119,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):

client.create_storage_connection("ibucket", "insert_bucket")

sql = fR'''
sql = f'''
pragma s3.JsonListSizeLimit="10";
INSERT INTO bindings.`{storage_sink_binding_name}`
SELECT
Expand All @@ -130,7 +130,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
SELECT
count(*)
Expand All @@ -148,7 +148,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].uint64_value == 37
assert sum(kikimr.control_plane.get_metering()) == 20
assert sum(kikimr.control_plane.get_metering(1)) == 20

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand All @@ -167,7 +167,7 @@ def test_insert_csv_delimiter(self, kikimr, s3, client, unique_prefix):
storage_connection_name = unique_prefix + "ibucket"
client.create_storage_connection(storage_connection_name, "insert_bucket")

sql = fR'''
sql = f'''
insert into `{storage_connection_name}`.`csv_delim_out/` with (
format=csv_with_names,
csv_delimiter=";"
Expand All @@ -179,11 +179,11 @@ def test_insert_csv_delimiter(self, kikimr, s3, client, unique_prefix):
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
prefix = "" # client.describe_query(query_id).result.query.meta.last_job_id.split("-")[0] # cut _<query_id> part

sql = R'''
select data from `{}`.`csv_delim_out/{}*` with (format=raw, schema(
sql = f'''
select data from `{storage_connection_name}`.`csv_delim_out/{prefix}*` with (format=raw, schema(
data String NOT NULL
))
'''.format(storage_connection_name, prefix)
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
Expand All @@ -196,7 +196,7 @@ def test_insert_csv_delimiter(self, kikimr, s3, client, unique_prefix):
assert result_set.columns[0].type.type_id == ydb.Type.STRING
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].bytes_value == b'"bar";"foo"\n"xxx";123\n"yyy";456\n'
assert sum(kikimr.control_plane.get_metering()) == 20
assert sum(kikimr.control_plane.get_metering(1)) == 20

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand All @@ -215,23 +215,23 @@ def test_append(self, kikimr, s3, client, unique_prefix):
storage_connection_name = unique_prefix + "abucket"
client.create_storage_connection(storage_connection_name, "append_bucket")

sql = fR'''
sql = f'''
insert into `{storage_connection_name}`.`append/` with (format=json_each_row)
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
insert into `{storage_connection_name}`.`append/` with (format=json_each_row)
select * from AS_TABLE([<|foo:345, bar:"zzz"u|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
select foo, bar from `{storage_connection_name}`.`append/` with (format=json_each_row, schema(
foo Int NOT NULL,
bar String NOT NULL
Expand All @@ -256,7 +256,7 @@ def test_append(self, kikimr, s3, client, unique_prefix):
assert result_set.rows[1].items[1].bytes_value == b'zzz'
assert result_set.rows[2].items[0].int32_value == 456
assert result_set.rows[2].items[1].bytes_value == b'yyy'
assert sum(kikimr.control_plane.get_metering()) == 30
assert sum(kikimr.control_plane.get_metering(1)) == 30

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand All @@ -275,15 +275,15 @@ def test_part_split(self, kikimr, s3, client, unique_prefix):
storage_connection_name = unique_prefix + "sbucket"
client.create_storage_connection(storage_connection_name, "split_bucket")

sql = fR'''
sql = f'''
insert into `{storage_connection_name}`.`part/` with (format=json_each_row, partitioned_by=(foo, bar))
select * from AS_TABLE([<|foo:123, bar:"xxx"u, data:3.14|>,<|foo:456, bar:"yyy"u, data:2.72|>,<|foo:123, bar:"xxx"u, data:1.41|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
select data from `{storage_connection_name}`.`part/foo=123/bar=xxx/` with (format=json_each_row, schema(
data Float NOT NULL,
))
Expand All @@ -301,7 +301,7 @@ def test_part_split(self, kikimr, s3, client, unique_prefix):
assert len(result_set.rows) == 2
assert abs(result_set.rows[0].items[0].float_value - 3.14) < 0.01
assert abs(result_set.rows[1].items[0].float_value - 1.41) < 0.01
assert sum(kikimr.control_plane.get_metering()) == 20
assert sum(kikimr.control_plane.get_metering(1)) == 20

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand All @@ -320,23 +320,23 @@ def test_part_merge(self, kikimr, s3, client, unique_prefix):
storage_connection_name = unique_prefix + "mbucket"
client.create_storage_connection(storage_connection_name, "merge_bucket")

sql = fR'''
sql = f'''
insert into `{storage_connection_name}`.`part/foo=123/bar=xxx/` with (format=json_each_row)
select * from AS_TABLE([<|data:3.14|>,<|data:1.41|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
insert into `{storage_connection_name}`.`part/foo=456/bar=yyy/` with (format=json_each_row)
select * from AS_TABLE([<|data:2.72|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
select foo, bar, data from `{storage_connection_name}`.`part` with (format=json_each_row, partitioned_by=(foo, bar), schema(
foo Int NOT NULL,
bar String NOT NULL,
Expand Down Expand Up @@ -367,7 +367,7 @@ def test_part_merge(self, kikimr, s3, client, unique_prefix):
assert result_set.rows[2].items[0].int32_value == 456
assert result_set.rows[2].items[1].bytes_value == b'yyy'
assert abs(result_set.rows[2].items[2].float_value - 2.72) < 0.01
assert sum(kikimr.control_plane.get_metering()) == 30
assert sum(kikimr.control_plane.get_metering(1)) == 30

@yq_all
@pytest.mark.parametrize("format", ["json_list", "json_each_row", "csv_with_names"])
Expand Down Expand Up @@ -403,15 +403,15 @@ def test_part_binding(self, kikimr, s3, client, format, unique_prefix):
"file_pattern": "*{json,csv}"
})

sql = fR'''
sql = f'''
insert into bindings.`{storage_binding_name}`
select * from AS_TABLE([<|foo:123, bar:"xxx"u, data:3.14|>,<|foo:456, bar:"yyy"u, data:2.72|>,<|foo:123, bar:"xxx"u, data:1.41|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

sql = fR'''
sql = f'''
select foo, bar, data from bindings.`{storage_binding_name}` order by foo, data
'''

Expand All @@ -438,7 +438,7 @@ def test_part_binding(self, kikimr, s3, client, format, unique_prefix):
assert result_set.rows[2].items[0].int32_value == 456
assert result_set.rows[2].items[1].text_value == 'yyy'
assert abs(result_set.rows[2].items[2].double_value - 2.72) < 0.01
assert sum(kikimr.control_plane.get_metering()) == 20
assert sum(kikimr.control_plane.get_metering(1)) == 20

@yq_v1
@pytest.mark.parametrize("format", ["json_each_row", "csv_with_names", "tsv_with_names", "parquet"])
Expand Down
24 changes: 12 additions & 12 deletions ydb/tests/fq/s3/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_csv(self, kikimr, s3, client, runtime_listing, unique_prefix):
aws_secret_access_key="secret_key"
)

fruits = R'''Fruit,Price,Weight
fruits = '''Fruit,Price,Weight
Banana,3,100
Apple,2,22
Pear,15,33'''
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_csv(self, kikimr, s3, client, runtime_listing, unique_prefix):
assert result_set.rows[2].items[0].bytes_value == b"Pear"
assert result_set.rows[2].items[1].int32_value == 15
assert result_set.rows[2].items[2].int32_value == 33
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand All @@ -104,7 +104,7 @@ def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):
aws_secret_access_key="secret_key"
)

fruits = R'''Time,Fruit,Price
fruits = '''Time,Fruit,Price
0,Banana,3
1,Apple,2
2,Pear,15'''
Expand Down Expand Up @@ -195,7 +195,7 @@ def test_raw(self, kikimr, s3, client, runtime_listing, yq_version, unique_prefi
assert result_set.rows[0].items[0].bytes_value == b"text3"
assert result_set.rows[1].items[0].bytes_value == b"text2"
assert result_set.rows[2].items[0].bytes_value == b"text1"
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down Expand Up @@ -491,7 +491,7 @@ def test_write_result(self, kikimr, s3, client, unique_prefix):
assert result_set.columns[2].name == "Weight"
assert result_set.columns[2].type.type_id == ydb.Type.INT64
assert len(result_set.rows) == 9
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down Expand Up @@ -553,7 +553,7 @@ def test_precompute(self, kikimr, s3, client, runtime_listing, unique_prefix):
assert result_set.rows[0].items[0].uint64_value == 1
assert result_set.rows[1].items[0].uint64_value == 1
assert result_set.rows[2].items[0].uint64_value == 1
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down Expand Up @@ -719,7 +719,7 @@ def test_simple_hits_47(self, kikimr, s3, client, runtime_listing, unique_prefix
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.DOUBLE
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].double_value == 3
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down Expand Up @@ -749,7 +749,7 @@ def test_i18n_unpartitioned(self, kikimr, s3, client, raw, path_pattern, runtime
i18n_directory = 'dataset/こんにちは/'
i18n_name = i18n_directory + 'fruitand&+ %непечатное.csv'

fruits = R'''Data
fruits = '''Data
101
102
103'''
Expand Down Expand Up @@ -788,7 +788,7 @@ def test_i18n_unpartitioned(self, kikimr, s3, client, raw, path_pattern, runtime
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].uint64_value == 1 if raw else 3
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down Expand Up @@ -817,7 +817,7 @@ def test_i18n_partitioning(self, kikimr, s3, client, raw, partitioning, runtime_

i18n_name = 'fruit and &{+}% непечатное.csv'

fruits = R'''Data
fruits = '''Data
101
102
103'''
Expand Down Expand Up @@ -883,7 +883,7 @@ def test_i18n_partitioning(self, kikimr, s3, client, raw, partitioning, runtime_
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].uint64_value == 2 if raw else 6
assert sum(kikimr.control_plane.get_metering()) == 10
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down Expand Up @@ -934,4 +934,4 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix):
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].uint64_value == 1024 * 10
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
assert sum(kikimr.control_plane.get_metering()) == 21
assert sum(kikimr.control_plane.get_metering(1)) == 21
Loading

0 comments on commit b56212c

Please sign in to comment.