From eb92fb32d26ff48824211f8880f02b2ed981e709 Mon Sep 17 00:00:00 2001 From: Sergey Skovorodkin Date: Mon, 16 Dec 2024 08:50:11 +0000 Subject: [PATCH] YMQ JSON API: send metering events --- ydb/core/http_proxy/http_req.cpp | 40 ++++++++++++++++-- ydb/core/http_proxy/http_req.h | 2 + ydb/core/http_proxy/ut/datastreams_fixture.h | 43 ++++++++++++++++---- ydb/core/http_proxy/ut/http_proxy_ut.h | 29 +++++++++++++ ydb/core/ymq/http/http.cpp | 12 ++++++ 5 files changed, 116 insertions(+), 10 deletions(-) diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 19ef8330767e..d7685cdc7d77 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -62,6 +62,7 @@ #include #include +#include #include @@ -438,8 +439,8 @@ namespace NKikimr::NHttpProxy { << " CloudId: " << ev->Get()->CloudId << " UserSid: " << ev->Get()->Sid; ); - FolderId = ev->Get()->FolderId; - CloudId = ev->Get()->CloudId; + HttpContext.FolderId = FolderId = ev->Get()->FolderId; + HttpContext.CloudId = CloudId = ev->Get()->CloudId; UserSid = ev->Get()->Sid; SendGrpcRequestNoDriver(ctx); } else { @@ -473,7 +474,8 @@ namespace NKikimr::NHttpProxy { return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url"); } CloudId = cloudIdAndResourceId.first; - ResourceId = cloudIdAndResourceId.second; + HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second; + HttpContext.ResponseData.YmqIsFifo = queueUrl.EndsWith(".fifo"); } } catch (const NKikimr::NSQS::TSQSException& e) { NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; @@ -1248,6 +1250,38 @@ namespace NKikimr::NHttpProxy { ResponseData.DumpBody(ContentType) ); + if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) { + // Send request attributes to the metering actor + auto reportRequestAttributes = MakeHolder(); + + auto& requestAttributes = reportRequestAttributes->Data; + + requestAttributes.HttpStatusCode = httpCode; + requestAttributes.IsFifo = ResponseData.YmqIsFifo; + requestAttributes.FolderId = FolderId; + requestAttributes.RequestSizeInBytes = Request->Size(); + requestAttributes.ResponseSizeInBytes = response->Size(); + requestAttributes.SourceAddress = SourceAddress; + requestAttributes.ResourceId = ResourceId; + requestAttributes.Action = NSQS::ActionFromString(MethodName); + + LOG_SP_DEBUG_S( + ctx, + NKikimrServices::HTTP_PROXY, + TStringBuilder() << "Send metering event." + << " HttpStatusCode: " << requestAttributes.HttpStatusCode + << " IsFifo: " << requestAttributes.IsFifo + << " FolderId: " << requestAttributes.FolderId + << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes + << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes + << " SourceAddress: " << requestAttributes.SourceAddress + << " ResourceId: " << requestAttributes.ResourceId + << " Action: " << requestAttributes.Action + ); + + ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release()); + } + ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); } diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index e7dbe4d2e9e9..7683f7634f19 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -58,6 +58,7 @@ struct THttpResponseData { TString ErrorText{"OK"}; TString YmqStatusCode; ui32 YmqHttpCode; + bool YmqIsFifo; TString DumpBody(MimeTypes contentType); }; @@ -83,6 +84,7 @@ struct THttpRequestContext { TString FolderId; // not in context TString CloudId; // not in context TString StreamName; // not in context + TString ResourceId; TString SourceAddress; TString MethodName; // used once TString ApiVersion; // used once diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h index 741ad480a9e6..cc994db7a6dd 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture.h +++ b/ydb/core/http_proxy/ut/datastreams_fixture.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -81,10 +82,10 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { InitAll(); } - void InitAll(bool yandexCloudMode = true) { + void InitAll(bool yandexCloudMode = true, bool enableMetering = false) { AccessServicePort = PortManager.GetPort(8443); AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort); - InitKikimr(yandexCloudMode); + InitKikimr(yandexCloudMode, enableMetering); InitAccessServiceService(); InitHttpServer(yandexCloudMode); } @@ -365,7 +366,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { return resultSet; } - void InitKikimr(bool yandexCloudMode) { + void InitKikimr(bool yandexCloudMode, bool enableMetering) { AuthFactory = std::make_shared(); NKikimrConfig::TAppConfig appConfig; appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true); @@ -379,6 +380,21 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode); appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true); + if (enableMetering) { + auto& sqsConfig = *appConfig.MutableSqsConfig(); + + sqsConfig.SetMeteringFlushingIntervalMs(100); + sqsConfig.SetMeteringLogFilePath("sqs_metering.log"); + TFsPath(sqsConfig.GetMeteringLogFilePath()).DeleteIfExists(); + + sqsConfig.AddMeteringCloudNetCidr("5.45.196.0/24"); + sqsConfig.AddMeteringCloudNetCidr("2a0d:d6c0::/29"); + sqsConfig.AddMeteringYandexNetCidr("127.0.0.0/8"); + sqsConfig.AddMeteringYandexNetCidr("5.45.217.0/24"); + + DoInitGlobalLog(CreateOwningThreadedLogBackend(sqsConfig.GetMeteringLogFilePath(), 0)); + } + auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); limit->SetMinPeriodSeconds(0); limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds()); @@ -414,6 +430,13 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG); ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE); + if (enableMetering) { + ActorRuntime->RegisterService( + NSQS::MakeSqsMeteringServiceID(), + ActorRuntime->Register(NSQS::CreateSqsMeteringService()) + ); + } + NYdb::TClient client(*(KikimrServer->ServerSettings)); UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, client.AlterUserAttributes("/", "Root", {{"folder_id", "folder4"}, @@ -477,7 +500,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { ); client.MkDir("/Root/SQS", ".FIFO"); - client.CreateTable("/Root/SQS/.FIFO", + client.CreateTable("/Root/SQS/.FIFO", "Name: \"Messages\"" "Columns { Name: \"QueueIdNumberHash\" Type: \"Uint64\"}" "Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}" @@ -537,7 +560,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { "KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]" ); - auto stateTableCommon = + auto stateTableCommon = "Name: \"State\"" "Columns { Name: \"QueueIdNumberHash\" Type: \"Uint64\"}" "Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}" @@ -581,7 +604,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { "KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]" ); - auto sentTimestampIdxCommonColumns= + auto sentTimestampIdxCommonColumns= "Columns { Name: \"QueueIdNumberAndShardHash\" Type: \"Uint64\"}" "Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}" "Columns { Name: \"Shard\" Type: \"Uint32\"}" @@ -700,7 +723,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { folderServiceConfig.SetEnable(false); actorId = as->Register(NKikimr::NFolderService::CreateFolderServiceActor(folderServiceConfig, "cloud4")); as->RegisterLocalService(NFolderService::FolderServiceActorId(), actorId); - + actorId = as->Register(NKikimr::NFolderService::CreateFolderServiceActor(folderServiceConfig, "cloud4")); as->RegisterLocalService(NSQS::MakeSqsFolderServiceID(), actorId); @@ -766,3 +789,9 @@ class THttpProxyTestMockForSQS : public THttpProxyTestMock { InitAll(false); } }; + +class THttpProxyTestMockWithMetering : public THttpProxyTestMock { + void SetUp(NUnitTest::TTestContext&) override { + InitAll(true, true); + } +}; diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index abd778e1cd48..e9869978c2c8 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1663,6 +1663,35 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(!GetByPath(json, "MessageId").empty()); } + Y_UNIT_TEST_F(BillingRecordsForJsonApi, THttpProxyTestMockWithMetering) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString queueUrl = GetByPath(json, "QueueUrl"); + UNIT_ASSERT(queueUrl.EndsWith("ExampleQueueName")); + + NJson::TJsonValue sendMessageReq; + sendMessageReq["QueueUrl"] = queueUrl; + auto body = "MessageBody-0"; + sendMessageReq["MessageBody"] = body; + sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + sendMessageReq["MessageGroupId"] = "MessageGroupId-0"; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT(!GetByPath(json, "SequenceNumber").empty()); + UNIT_ASSERT(!GetByPath(json, "MD5OfMessageBody").empty()); + UNIT_ASSERT(!GetByPath(json, "MessageId").empty()); + + // TODO: + // Sleep(TDuration::Seconds(500)); + // TVector records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath()); + // CheckBillingRecord(records, expectedRecords); + } + Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) { NJson::TJsonValue sendMessageReq; sendMessageReq["QueueUrl"] = ""; diff --git a/ydb/core/ymq/http/http.cpp b/ydb/core/ymq/http/http.cpp index 7cb47162509a..b98b124ec709 100644 --- a/ydb/core/ymq/http/http.cpp +++ b/ydb/core/ymq/http/http.cpp @@ -164,6 +164,18 @@ void THttpRequest::WriteResponse(const TReplyParams& replyParams, const TSqsHttp requestAttributes.ResourceId = response.ResourceId; requestAttributes.Action = Action_; + RLOG_SQS_BASE_DEBUG(*Parent_->ActorSystem_, + TStringBuilder() << "Send metering event." + << " HttpStatusCode: " << requestAttributes.HttpStatusCode + << " IsFifo: " << requestAttributes.IsFifo + << " FolderId: " << requestAttributes.FolderId + << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes + << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes + << " SourceAddress: " << requestAttributes.SourceAddress + << " ResourceId: " << requestAttributes.ResourceId + << " Action: " << requestAttributes.Action + ); + Parent_->ActorSystem_->Send(MakeSqsMeteringServiceID(), reportRequestAttributes.Release()); }