diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp index 8f64f8d4c94a..569fd7079635 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp @@ -128,6 +128,11 @@ std::vector GetMeteringRecords(const TString& statistics, bool billable if (auto* ingressNode = graph.second.GetValueByPath("IngressBytes.sum")) { ingress += ingressNode->GetIntegerSafe(); } + // special exclusion for PQ/YDS in YQv1 + if (auto* pqIngressNode = graph.second.GetValueByPath("TaskRunner.Source=PqSource.Stage=Total.IngressBytes.sum")) { + ui64 pqIngress = pqIngressNode->GetIntegerSafe(); + ingress = ingress > pqIngress ? (ingress - pqIngress) : 0; + } } } diff --git a/ydb/tests/fq/yds/test_kill_pq_bill.py b/ydb/tests/fq/yds/test_kill_pq_bill.py new file mode 100644 index 000000000000..e031268aad86 --- /dev/null +++ b/ydb/tests/fq/yds/test_kill_pq_bill.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import logging +import time +import json + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestKillPqBill(TestYdsBase): + @yq_v1 + def test_do_not_bill_pq(self, kikimr, client): + self.init_topics("no_pq_bill") + + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO yds.`{output_topic}` + SELECT Data AS Data + FROM yds.`{input_topic}`;''' \ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client.create_yds_connection(name="yds", database_id="FakeDatabaseId") + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING, + vcpu_time_limit=1).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data_1mb = ['1' * 1024 * 1024] + message_count = 15 + for _ in range(0, message_count): + self.write_stream(data_1mb) + self.read_stream(message_count) + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + + stat = json.loads(client.describe_query(query_id).result.query.statistics.json) + + graph_name = "Graph=0" + ingress_bytes = stat[graph_name]["IngressBytes"]["sum"] + + assert ingress_bytes >= 15 * 1024 * 1024, "Ingress must be >= 15MB" + assert sum(kikimr.control_plane.get_metering()) == 10 diff --git a/ydb/tests/fq/yds/ya.make b/ydb/tests/fq/yds/ya.make index da9713bceaf9..a04690d9b7aa 100644 --- a/ydb/tests/fq/yds/ya.make +++ b/ydb/tests/fq/yds/ya.make @@ -28,6 +28,7 @@ TEST_SRCS( test_cpu_quota.py test_delete_read_rules_after_abort_by_system.py test_eval.py + test_kill_pq_bill.py test_mem_alloc.py test_metrics_cleanup.py test_pq_read_write.py