Skip to content

Commit

Permalink
Fixed sesors path
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 13, 2024
1 parent 3eb9fa3 commit d60d359
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@ TString TSchemaColumn::ToString() const {
return TStringBuilder() << "'" << Name << "' : " << TypeYson;
}

//// TCountersDesc

TCountersDesc TCountersDesc::SetPath(const TString& countersPath) const {
TCountersDesc result(*this);
result.CountersPath = countersPath;
return result;
}

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ struct TCountersDesc {
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
NMonitoring::TDynamicCounterPtr CountersSubgroup = MakeIntrusive<NMonitoring::TDynamicCounters>();
TString CountersPath; // Used for counters created from CountersRoot

[[nodiscard]] TCountersDesc SetPath(const TString& countersPath) const;
};

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
public:
TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, const TCountersDesc& counters)
: TBase(&TTopicFormatHandler::StateFunc)
, TTypeParser(__LOCATION__, counters)
, TTypeParser(__LOCATION__, counters.SetPath("row_dispatcher"))
, Config(config)
, Settings(settings)
, LogPrefix(TStringBuilder() << "TTopicFormatHandler [" << Settings.ParsingFormat << "]: ")
Expand Down Expand Up @@ -487,11 +487,12 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

TValueStatus<ITopicParser::TPtr> CreateParserForFormat() const {
const auto& counters = Counters.Counters.SetPath("row_dispatcher_parser");
if (Settings.ParsingFormat == "raw") {
return CreateRawParser(ParserHandler, Counters.Counters);
return CreateRawParser(ParserHandler, counters);
}
if (Settings.ParsingFormat == "json_each_row") {
return CreateJsonParser(ParserHandler, Config.JsonParserConfig, Counters.Counters);
return CreateJsonParser(ParserHandler, Config.JsonParserConfig, counters);
}
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
ActorContext(),
FormatHandlerConfig,
handlerSettings,
{.CountersRoot = CountersRoot, .CountersSubgroup = Metrics.PartitionGroup, .CountersPath = "topic_session"}
{.CountersRoot = CountersRoot, .CountersSubgroup = Metrics.PartitionGroup}
)}).first;
}

Expand Down

0 comments on commit d60d359

Please sign in to comment.