Skip to content

Commit

Permalink
YMQ JSON API: send metering events (24-3) (#12635)
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Dec 16, 2024
1 parent df8f59f commit 030b601
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 10 deletions.
40 changes: 37 additions & 3 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include <ydb/library/folder_service/events.h>

#include <ydb/core/ymq/actor/auth_multi_factory.h>
#include <ydb/core/ymq/actor/serviceid.h>

#include <ydb/library/http_proxy/error/error.h>

Expand Down Expand Up @@ -438,8 +439,8 @@ namespace NKikimr::NHttpProxy {
<< " CloudId: " << ev->Get()->CloudId
<< " UserSid: " << ev->Get()->Sid;
);
FolderId = ev->Get()->FolderId;
CloudId = ev->Get()->CloudId;
HttpContext.FolderId = FolderId = ev->Get()->FolderId;
HttpContext.CloudId = CloudId = ev->Get()->CloudId;
UserSid = ev->Get()->Sid;
SendGrpcRequestNoDriver(ctx);
} else {
Expand Down Expand Up @@ -473,7 +474,8 @@ namespace NKikimr::NHttpProxy {
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
}
CloudId = cloudIdAndResourceId.first;
ResourceId = cloudIdAndResourceId.second;
HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second;
HttpContext.ResponseData.YmqIsFifo = queueUrl.EndsWith(".fifo");
}
} catch (const NKikimr::NSQS::TSQSException& e) {
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
Expand Down Expand Up @@ -1248,6 +1250,38 @@ namespace NKikimr::NHttpProxy {
ResponseData.DumpBody(ContentType)
);

if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) {
// Send request attributes to the metering actor
auto reportRequestAttributes = MakeHolder<NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>();

auto& requestAttributes = reportRequestAttributes->Data;

requestAttributes.HttpStatusCode = httpCode;
requestAttributes.IsFifo = ResponseData.YmqIsFifo;
requestAttributes.FolderId = FolderId;
requestAttributes.RequestSizeInBytes = Request->Size();
requestAttributes.ResponseSizeInBytes = response->Size();
requestAttributes.SourceAddress = SourceAddress;
requestAttributes.ResourceId = ResourceId;
requestAttributes.Action = NSQS::ActionFromString(MethodName);

LOG_SP_DEBUG_S(
ctx,
NKikimrServices::HTTP_PROXY,
TStringBuilder() << "Send metering event."
<< " HttpStatusCode: " << requestAttributes.HttpStatusCode
<< " IsFifo: " << requestAttributes.IsFifo
<< " FolderId: " << requestAttributes.FolderId
<< " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
<< " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
<< " SourceAddress: " << requestAttributes.SourceAddress
<< " ResourceId: " << requestAttributes.ResourceId
<< " Action: " << requestAttributes.Action
);

ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
}

ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/http_proxy/http_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct THttpResponseData {
TString ErrorText{"OK"};
TString YmqStatusCode;
ui32 YmqHttpCode;
bool YmqIsFifo;

TString DumpBody(MimeTypes contentType);
};
Expand All @@ -83,6 +84,7 @@ struct THttpRequestContext {
TString FolderId; // not in context
TString CloudId; // not in context
TString StreamName; // not in context
TString ResourceId;
TString SourceAddress;
TString MethodName; // used once
TString ApiVersion; // used once
Expand Down
43 changes: 36 additions & 7 deletions ydb/core/http_proxy/ut/datastreams_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/grpc/server/actors/logger.h>
#include <library/cpp/http/misc/parsed_request.h>
#include <library/cpp/json/json_writer.h>
#include <library/cpp/logger/global/global.h>
#include <library/cpp/resource/resource.h>
#include <library/cpp/testing/unittest/registar.h>

Expand Down Expand Up @@ -81,10 +82,10 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
InitAll();
}

void InitAll(bool yandexCloudMode = true) {
void InitAll(bool yandexCloudMode = true, bool enableMetering = false) {
AccessServicePort = PortManager.GetPort(8443);
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
InitKikimr(yandexCloudMode);
InitKikimr(yandexCloudMode, enableMetering);
InitAccessServiceService();
InitHttpServer(yandexCloudMode);
}
Expand Down Expand Up @@ -365,7 +366,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
return resultSet;
}

void InitKikimr(bool yandexCloudMode) {
void InitKikimr(bool yandexCloudMode, bool enableMetering) {
AuthFactory = std::make_shared<TIamAuthFactory>();
NKikimrConfig::TAppConfig appConfig;
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
Expand All @@ -379,6 +380,21 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);

if (enableMetering) {
auto& sqsConfig = *appConfig.MutableSqsConfig();

sqsConfig.SetMeteringFlushingIntervalMs(100);
sqsConfig.SetMeteringLogFilePath("sqs_metering.log");
TFsPath(sqsConfig.GetMeteringLogFilePath()).DeleteIfExists();

sqsConfig.AddMeteringCloudNetCidr("5.45.196.0/24");
sqsConfig.AddMeteringCloudNetCidr("2a0d:d6c0::/29");
sqsConfig.AddMeteringYandexNetCidr("127.0.0.0/8");
sqsConfig.AddMeteringYandexNetCidr("5.45.217.0/24");

DoInitGlobalLog(CreateOwningThreadedLogBackend(sqsConfig.GetMeteringLogFilePath(), 0));
}

auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
limit->SetMinPeriodSeconds(0);
limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds());
Expand Down Expand Up @@ -414,6 +430,13 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG);
ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);

if (enableMetering) {
ActorRuntime->RegisterService(
NSQS::MakeSqsMeteringServiceID(),
ActorRuntime->Register(NSQS::CreateSqsMeteringService())
);
}

NYdb::TClient client(*(KikimrServer->ServerSettings));
UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
client.AlterUserAttributes("/", "Root", {{"folder_id", "folder4"},
Expand Down Expand Up @@ -477,7 +500,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
);

client.MkDir("/Root/SQS", ".FIFO");
client.CreateTable("/Root/SQS/.FIFO",
client.CreateTable("/Root/SQS/.FIFO",
"Name: \"Messages\""
"Columns { Name: \"QueueIdNumberHash\" Type: \"Uint64\"}"
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
Expand Down Expand Up @@ -537,7 +560,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
"KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]"
);

auto stateTableCommon =
auto stateTableCommon =
"Name: \"State\""
"Columns { Name: \"QueueIdNumberHash\" Type: \"Uint64\"}"
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
Expand Down Expand Up @@ -581,7 +604,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]"
);

auto sentTimestampIdxCommonColumns=
auto sentTimestampIdxCommonColumns=
"Columns { Name: \"QueueIdNumberAndShardHash\" Type: \"Uint64\"}"
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
"Columns { Name: \"Shard\" Type: \"Uint32\"}"
Expand Down Expand Up @@ -700,7 +723,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
folderServiceConfig.SetEnable(false);
actorId = as->Register(NKikimr::NFolderService::CreateFolderServiceActor(folderServiceConfig, "cloud4"));
as->RegisterLocalService(NFolderService::FolderServiceActorId(), actorId);

actorId = as->Register(NKikimr::NFolderService::CreateFolderServiceActor(folderServiceConfig, "cloud4"));
as->RegisterLocalService(NSQS::MakeSqsFolderServiceID(), actorId);

Expand Down Expand Up @@ -766,3 +789,9 @@ class THttpProxyTestMockForSQS : public THttpProxyTestMock {
InitAll(false);
}
};

class THttpProxyTestMockWithMetering : public THttpProxyTestMock {
void SetUp(NUnitTest::TTestContext&) override {
InitAll(true, true);
}
};
29 changes: 29 additions & 0 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,35 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
}

Y_UNIT_TEST_F(BillingRecordsForJsonApi, THttpProxyTestMockWithMetering) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString queueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(queueUrl.EndsWith("ExampleQueueName"));

NJson::TJsonValue sendMessageReq;
sendMessageReq["QueueUrl"] = queueUrl;
auto body = "MessageBody-0";
sendMessageReq["MessageBody"] = body;
sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0";
sendMessageReq["MessageGroupId"] = "MessageGroupId-0";

res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "SequenceNumber").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());

// TODO:
// Sleep(TDuration::Seconds(500));
// TVector<NSc::TValue> records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath());
// CheckBillingRecord(records, expectedRecords);
}

Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
NJson::TJsonValue sendMessageReq;
sendMessageReq["QueueUrl"] = "";
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/ymq/http/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ void THttpRequest::WriteResponse(const TReplyParams& replyParams, const TSqsHttp
requestAttributes.ResourceId = response.ResourceId;
requestAttributes.Action = Action_;

RLOG_SQS_BASE_DEBUG(*Parent_->ActorSystem_,
TStringBuilder() << "Send metering event."
<< " HttpStatusCode: " << requestAttributes.HttpStatusCode
<< " IsFifo: " << requestAttributes.IsFifo
<< " FolderId: " << requestAttributes.FolderId
<< " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
<< " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
<< " SourceAddress: " << requestAttributes.SourceAddress
<< " ResourceId: " << requestAttributes.ResourceId
<< " Action: " << requestAttributes.Action
);

Parent_->ActorSystem_->Send(MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
}

Expand Down

0 comments on commit 030b601

Please sign in to comment.