Skip to content

Commit

Permalink
sanitize query name label (ydb-platform#12746)
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik authored Dec 18, 2024
1 parent db42db4 commit 3317b75
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 30 deletions.
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/actors/pending_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/metrics/sanitize_label.h>
#include <ydb/core/fq/libs/private_client/internal_service.h>

#include <ydb/library/actors/core/log.h>
Expand Down Expand Up @@ -360,8 +361,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
const TString queryId = task.query_id().value();
const bool isStreaming = task.query_type() == FederatedQuery::QueryContent::STREAMING;
TString queryIdLabel;
// todo: sanitize query name
TString queryNameLabel = task.query_name();
TString queryNameLabel = SanitizeLabel(task.query_name());
if (task.automatic()) {
queryIdLabel = isStreaming ? "streaming" : "analytics";
} else if (isStreaming) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ PEERDIR(
ydb/core/fq/libs/db_schema
ydb/core/fq/libs/events
ydb/core/fq/libs/grpc
ydb/core/fq/libs/metrics
ydb/core/fq/libs/private_client
ydb/core/fq/libs/rate_limiter/utils
ydb/core/fq/libs/result_formatter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#include "common.h"
#include "sanitize_label.h"

#include <util/generic/string.h>

namespace NFq {

TString CleanupCounterValueString(const TString& value) {
TString clean;
constexpr auto valueLenghtLimit = 200;
TString SanitizeLabel(const TString& value) {
TString result;
result.reserve(value.size());
constexpr auto labelLengthLimit = 200;

for (auto c : value) {
switch (c) {
Expand All @@ -19,13 +20,13 @@ TString CleanupCounterValueString(const TString& value) {
case '\\':
continue;
default:
clean.push_back(c);
if (clean.size() == valueLenghtLimit) {
break;
result.push_back(c);
if (result.size() == labelLengthLimit) {
return result;
}
}
}
return clean;
return result;
}

} // namespace NFq
9 changes: 9 additions & 0 deletions ydb/core/fq/libs/metrics/sanitize_label.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <util/generic/fwd.h>

namespace NFq {

TString SanitizeLabel(const TString& value);

} // namespace NFq
29 changes: 29 additions & 0 deletions ydb/core/fq/libs/metrics/ut/sanitize_label_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include <ydb/core/fq/libs/metrics/sanitize_label.h>

#include <library/cpp/testing/unittest/registar.h>

Y_UNIT_TEST_SUITE(SanitizeLable) {
Y_UNIT_TEST(Empty) {
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel(""), "");
}

Y_UNIT_TEST(SkipSingleBadSymbol) {
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("|"), "");
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("*"), "");
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("?"), "");
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("\""), "");
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("'"), "");
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("`"), "");
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("\\"), "");
}

Y_UNIT_TEST(SkipBadSymbols) {
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel("a|b*c?d\"e'f`g\\h"), "abcdefgh");
}

Y_UNIT_TEST(Truncate200) {
TString s1(400, 'a');
TString s2(200, 'a');
UNIT_ASSERT_VALUES_EQUAL(NFq::SanitizeLabel(s1), s2);
}
}
3 changes: 1 addition & 2 deletions ydb/core/fq/libs/metrics/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
UNITTEST_FOR(ydb/core/fq/libs/metrics)

FORK_SUBTESTS()

IF (SANITIZER_TYPE OR WITH_VALGRIND)
SIZE(MEDIUM)
ENDIF()

SRCS(
metrics_ut.cpp
sanitize_label_ut.cpp
)

YQL_LAST_ABI_VERSION()
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/metrics/ya.make
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
LIBRARY()

SRCS(
sanitize_label.cpp
status_code_counters.cpp
)

PEERDIR(
library/cpp/monlib/dynamic_counters
ydb/library/yql/dq/actors/protos
yql/essentials/public/issue
ydb/library/yql/dq/actors/protos
)

YQL_LAST_ABI_VERSION()
Expand Down
9 changes: 0 additions & 9 deletions ydb/core/fq/libs/row_dispatcher/common.h

This file was deleted.

6 changes: 3 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "row_dispatcher.h"

#include "actors_factory.h"
#include "common.h"
#include "coordinator.h"
#include "leader_election.h"

Expand All @@ -15,6 +14,7 @@

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/metrics/sanitize_label.h>
#include <ydb/core/mon/mon.h>

#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
Expand Down Expand Up @@ -598,7 +598,7 @@ void TRowDispatcher::UpdateMetrics() {

void TRowDispatcher::SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax) {
auto queryGroup = Metrics.Counters->GetSubgroup("query_id", queryKey.QueryId);
auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(queryKey.ReadGroup));
auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(queryKey.ReadGroup));
topicGroup->GetCounter("MaxUnreadBytes")->Set(unreadBytesMax);
topicGroup->GetCounter("AvgUnreadBytes")->Set(unreadBytesAvg);
topicGroup->GetCounter("MaxReadLag")->Set(readLagMessagesMax);
Expand Down Expand Up @@ -740,7 +740,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", read group " << ev->Get()->Record.GetSource().GetReadGroup() << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
" part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie);
auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(ev->Get()->Record.GetSource().GetReadGroup()));
auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup()));
topicGroup->GetCounter("StartSession", true)->Inc();

NodesTracker.AddNode(ev->Sender.NodeId());
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include "topic_session.h"

#include "common.h"

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/metrics/sanitize_label.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h>

Expand All @@ -25,7 +24,7 @@ namespace {

struct TTopicSessionMetrics {
void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, ui32 partitionId) {
TopicGroup = counters->GetSubgroup("topic", CleanupCounterValueString(topicPath));
TopicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath));
AllSessionsDataRate = counters->GetCounter("AllSessionsDataRate", true);

PartitionGroup = TopicGroup->GetSubgroup("partition", ToString(partitionId));
Expand Down Expand Up @@ -107,7 +106,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
}
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(readGroup));
auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup));
FilteredDataRate = topicGroup->GetCounter("FilteredDataRate", true);
RestartSessionByOffsetsByQuery = counters->GetCounter("RestartSessionByOffsetsByQuery", true);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ LIBRARY()

SRCS(
actors_factory.cpp
common.cpp
coordinator.cpp
leader_election.cpp
row_dispatcher_service.cpp
Expand All @@ -13,6 +12,7 @@ SRCS(
PEERDIR(
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/metrics
ydb/core/fq/libs/row_dispatcher/events
ydb/core/fq/libs/row_dispatcher/format_handler
ydb/core/fq/libs/row_dispatcher/purecalc_compilation
Expand Down

0 comments on commit 3317b75

Please sign in to comment.