Skip to content

Commit

Permalink
Merge ab971bc into f1c6963
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Dec 17, 2024
2 parents f1c6963 + ab971bc commit 9ce6312
Showing 1 changed file with 101 additions and 6 deletions.
107 changes: 101 additions & 6 deletions ydb/core/http_proxy/ut/ymq_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/scheme/scheme.h>
#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/core/ymq/actor/metering.h>

#include <chrono>
#include <thread>
Expand Down Expand Up @@ -185,15 +187,108 @@ Y_UNIT_TEST_SUITE(TestYmqHttpProxy) {
auto json = CreateQueue({{"QueueName", "ExampleQueueName"}});
auto queueUrl = GetByPath<TString>(json, "QueueUrl");

json = SendMessage({
SendMessage({
{"QueueUrl", queueUrl},
{"MessageBody", "MessageBody-0"}
{"MessageBody", TString(1_KB, 'x')}, // 1 request unit
});

json = ReceiveMessage({
{"QueueUrl", queueUrl},
{"WaitTimeSeconds", 20},
});
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);

SendMessage({
{"QueueUrl", queueUrl},
{"MessageBody", TString(150_KB, 'x')}, // 3 request units
});

json = ReceiveMessage({
{"QueueUrl", queueUrl},
{"WaitTimeSeconds", 20},
});
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);

auto makeTags = [](TVector<std::pair<TString, TString>> pairs) {
NSc::TValue tags;
tags.SetDict();
for (auto const& [k, v] : pairs) {
tags[k] = v;
}
return tags;
};
auto makeRecord = [&makeTags](TString type, TString resourceId, size_t quantity, TVector<std::pair<TString, TString>> tags) {
return NKikimr::NSQS::CreateMeteringBillingRecord(
"folder4",
resourceId,
type,
"fqdn",
TInstant::Now(),
quantity,
type == "ymq.traffic.v1" ? "byte" : "request",
makeTags(tags)
);
};
auto asExpected = [](NSc::TValue record, NSc::TValue expected) {
return record["folder_id"] == expected["folder_id"] &&
record["resource_id"] == expected["resource_id"] &&
record["schema"] == expected["schema"] &&
record["usage"]["unit"] == expected["usage"]["unit"] &&
(record["schema"] != "ymq.requests.v1" || record["usage"]["quantity"] == expected["usage"]["quantity"]) &&
record["tags"]["direction"] == expected["tags"]["direction"] &&
record["tags"]["type"] == expected["tags"]["type"] &&
record["tags"]["queue_type"] == expected["tags"]["queue_type"];
};

// loadBillingRecords was copied from metering_ut.cpp.
// Probably, that file is a better place for this test.
auto loadBillingRecords = [](const TString& filepath) -> TVector<NSc::TValue> {
TString data = TFileInput(filepath).ReadAll();
auto rawRecords = SplitString(data, "\n");
TVector<NSc::TValue> records;
for (auto& record : rawRecords) {
records.push_back(NSc::TValue::FromJson(record));
}
return records;
};

TVector<NSc::TValue> expectedRecords{
// CreateQueue
makeRecord("ymq.traffic.v1", "", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "", 1, {{"queue_type", "other"}}),

// SendMessage 1 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 1, {{"queue_type", "std"}}),

// ReceiveMessage 1 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 1, {{"queue_type", "std"}}),

// SendMessage 150 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 3, {{"queue_type", "std"}}),

// ReceiveMessage 150 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 3, {{"queue_type", "std"}}),
};

// TODO:
// Sleep(TDuration::Seconds(500));
// TVector<NSc::TValue> records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath());
// CheckBillingRecord(records, expectedRecords);
auto filePath = KikimrServer->ServerSettings->AppConfig->GetSqsConfig().GetMeteringLogFilePath();

TVector<NSc::TValue> records;
while (records.size() != expectedRecords.size()) {
Sleep(TDuration::Seconds(1));
records = loadBillingRecords(filePath);
}
for (size_t i = 0; i < records.size(); ++i) {
UNIT_ASSERT(asExpected(records[i], expectedRecords[i]));
}
}

Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
Expand Down

0 comments on commit 9ce6312

Please sign in to comment.