Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOGBROKER 8891 fix not using optional fields in sqs json api properly #8523

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 135 additions & 1 deletion ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,6 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
res = SendHttpRequest("/Root", "AmazonSQS.DeleteQueue", std::move(deleteQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);


for (int i = 0; i < 61; ++i) {
req = CreateSqsGetQueueUrlRequest();
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueUrl", std::move(req), FormAuthorizationStr("ru-central1"));
Expand Down Expand Up @@ -2042,4 +2041,139 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl);
}

Y_UNIT_TEST_F(TestChangeMessageVisibility, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");

NJson::TJsonValue sendMessageReq;
sendMessageReq["QueueUrl"] = resultQueueUrl;
auto body = "MessageBody-0";
sendMessageReq["MessageBody"] = body;

res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

for (int i = 0; i < 20; ++i) {
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
if (res.Body != TString("{}")) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

auto receiptHandle = json["Messages"][0]["ReceiptHandle"].GetString();
UNIT_ASSERT(!receiptHandle.Empty());

NJson::TJsonValue changeMessageVisibility;
changeMessageVisibility["QueueUrl"] = resultQueueUrl;
changeMessageVisibility["ReceiptHandle"] = receiptHandle;
changeMessageVisibility["VisibilityTimeout"] = 1;

res = SendHttpRequest(
"/Root",
"AmazonSQS.ChangeMessageVisibility",
std::move(changeMessageVisibility),
FormAuthorizationStr("ru-central1")
);

UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
}

Y_UNIT_TEST_F(TestChangeMessageVisibilityBatch, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));

NJson::TJsonValue message0;
message0["Id"] = "Id-0";
message0["MessageBody"] = "MessageBody-0";
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";

NJson::TJsonValue message1;
message1["Id"] = "Id-1";
message1["MessageBody"] = "MessageBody-1";
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";

NJson::TJsonArray entries = {message0, message1};

NJson::TJsonValue sendMessageBatchReq;
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
sendMessageBatchReq["Entries"] = entries;

res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);

TVector<NJson::TJsonValue> messages;
for (int i = 0; i < 20; ++i) {
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
if (res.Body != TString("{}")) {
NJson::ReadJsonTree(res.Body, &json);
if (json["Messages"].GetArray().size() == 2) {
messages.push_back(json["Messages"][0]);
messages.push_back(json["Messages"][1]);
break;
}
if (json["Messages"].GetArray().size() == 1) {
messages.push_back(json["Messages"][0]);
if (messages.size() == 2) {
break;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);

auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString();
UNIT_ASSERT(!receiptHandle0.Empty());
auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString();
UNIT_ASSERT(!receiptHandle1.Empty());


NJson::TJsonValue changeMessageVisibilityBatchReq;
changeMessageVisibilityBatchReq["QueueUrl"] = resultQueueUrl;

NJson::TJsonValue entry0;
entry0["Id"] = "Id-0";
entry0["ReceiptHandle"] = receiptHandle0;
entry0["VisibilityTimeout"] = 1;

NJson::TJsonValue entry1;
entry1["Id"] = "Id-1";
entry1["ReceiptHandle"] = receiptHandle1;
entry1["VisibilityTimeout"] = 2;

NJson::TJsonArray changeVisibilityEntries = {entry0, entry1};
changeMessageVisibilityBatchReq["Entries"] = changeVisibilityEntries;

res = SendHttpRequest(
"/Root", "AmazonSQS.ChangeMessageVisibilityBatch",
std::move(changeMessageVisibilityBatchReq),
FormAuthorizationStr("ru-central1")
);
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0");
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1");
}
} // Y_UNIT_TEST_SUITE(TestHttpProxy)
34 changes: 17 additions & 17 deletions ydb/public/api/protos/draft/ymq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ message ChangeMessageVisibilityResult {
message ChangeMessageVisibilityBatchRequestEntry {
string id = 1;
string receipt_handle = 2;
int32 visibility_timeout = 3;
optional int32 visibility_timeout = 3;
}

message ChangeMessageVisibilityBatchRequest {
Expand Down Expand Up @@ -128,7 +128,7 @@ message GetQueueAttributesResult {
message GetQueueUrlRequest {
Ydb.Operations.OperationParams operation_params = 1;
string queue_name = 2;
string queue_owner_aws_account_id = 3;
optional string queue_owner_aws_account_id = 3;
}

message GetQueueUrlResponse {
Expand All @@ -141,9 +141,9 @@ message GetQueueUrlResult {

message ListQueuesRequest {
Ydb.Operations.OperationParams operation_params = 1;
int64 max_results = 2;
string next_token = 3;
string queue_name_prefix = 4;
optional int64 max_results = 2;
optional string next_token = 3;
optional string queue_name_prefix = 4;
}

message ListQueuesResponse {
Expand Down Expand Up @@ -178,13 +178,13 @@ message MessageAttribute {
message ReceiveMessageRequest {
Ydb.Operations.OperationParams operation_params = 1;
repeated string attribute_names = 2;
int32 max_number_of_messages = 3;
optional int32 max_number_of_messages = 3;
repeated string message_attribute_names = 4;
repeated string message_system_attribute_names = 5;
string queue_url = 6;
string receive_request_attempt_id = 7;
int32 visibility_timeout = 8;
int32 wait_time_seconds = 9;
optional string receive_request_attempt_id = 7;
optional int32 visibility_timeout = 8;
optional int32 wait_time_seconds = 9;
}

message ReceiveMessageResponse {
Expand All @@ -207,11 +207,11 @@ message ReceiveMessageResult {

message SendMessageRequest {
Ydb.Operations.OperationParams operation_params = 1;
int32 delay_seconds = 2;
optional int32 delay_seconds = 2;
map<string, MessageAttribute> message_attributes = 3;
string message_body = 4;
string message_deduplication_id = 5;
string message_group_id = 6;
optional string message_deduplication_id = 5;
optional string message_group_id = 6;
map<string, MessageAttribute> message_system_attributes = 7;
string queue_url = 8;
}
Expand All @@ -237,11 +237,11 @@ message BatchResultErrorEntry {

message SendMessageBatchRequestEntry {
string id = 1;
int32 delay_seconds = 2;
optional int32 delay_seconds = 2;
map<string, MessageAttribute> message_attributes = 3;
string message_body = 4;
string message_deduplication_id = 5;
string message_group_id = 6;
optional string message_deduplication_id = 5;
optional string message_group_id = 6;
map<string, MessageAttribute> message_system_attributes = 7;
string queue_url = 8;
}
Expand Down Expand Up @@ -285,8 +285,8 @@ message SetQueueAttributesResult {

message ListDeadLetterSourceQueuesRequest {
Ydb.Operations.OperationParams operation_params = 1;
int32 max_results = 2;
string next_token = 3;
optional int32 max_results = 2;
optional string next_token = 3;
string queue_url = 4;
}

Expand Down
71 changes: 59 additions & 12 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,30 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TSendMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSendMessage();

for (auto& srcAttribute: GetProtoRequest()->Getmessage_attributes()) {
auto dstAttribute = result->MutableMessageAttributes()->Add();
dstAttribute->SetName(srcAttribute.first);
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
}
result->SetMessageDeduplicationId(GetProtoRequest()->message_deduplication_id());
result->SetMessageGroupId(GetProtoRequest()->message_group_id());
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetMessageBody(GetProtoRequest()->message_body());

if (GetProtoRequest()->Hasdelay_seconds()) {
result->SetDelaySeconds(GetProtoRequest()->Getdelay_seconds());
}

if (GetProtoRequest()->Hasmessage_deduplication_id()) {
result->SetMessageDeduplicationId(GetProtoRequest()->Getmessage_deduplication_id());
}

if (GetProtoRequest()->Hasmessage_group_id()) {
result->SetMessageGroupId(GetProtoRequest()->message_group_id());
}

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);

result->SetMessageBody(GetProtoRequest()->Getmessage_body());

return result;
}
Expand Down Expand Up @@ -376,14 +389,29 @@ namespace NKikimr::NYmq::V1 {
result->SetAttributeName(i, attributeNames[i]);
}
}
result->SetMaxNumberOfMessages(GetProtoRequest()->max_number_of_messages());

if (GetProtoRequest()->Hasmax_number_of_messages()) {
result->SetMaxNumberOfMessages(GetProtoRequest()->Getmax_number_of_messages());
}

for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) {
result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]);
}

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetReceiveRequestAttemptId(GetProtoRequest()->Getreceive_request_attempt_id());
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
result->SetWaitTimeSeconds(GetProtoRequest()->Getwait_time_seconds());

if (GetProtoRequest()->Hasreceive_request_attempt_id()) {
result->SetReceiveRequestAttemptId(GetProtoRequest()->Getreceive_request_attempt_id());
}

if (GetProtoRequest()->Hasvisibility_timeout()) {
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
}

if (GetProtoRequest()->Haswait_time_seconds()) {
result->SetWaitTimeSeconds(GetProtoRequest()->Getwait_time_seconds());
}

return result;
}

Expand Down Expand Up @@ -559,7 +587,9 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TListQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableListQueues();
result->SetQueueNamePrefix(GetProtoRequest()->Getqueue_name_prefix());
if (GetProtoRequest()->Hasqueue_name_prefix()) {
result->SetQueueNamePrefix(GetProtoRequest()->Getqueue_name_prefix());
}
return result;
}
};
Expand Down Expand Up @@ -818,19 +848,34 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSendMessageBatch();

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);

for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add();

if (requestEntry.Hasdelay_seconds()) {
entry->SetDelaySeconds(requestEntry.Getdelay_seconds());
}

entry->SetId(requestEntry.Getid());

for (auto& srcAttribute: requestEntry.Getmessage_attributes()) {
auto dstAttribute = entry->MutableMessageAttributes()->Add();
dstAttribute->SetName(srcAttribute.first);
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
}
entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id());
entry->SetMessageGroupId(requestEntry.Getmessage_group_id());

if (requestEntry.Hasmessage_deduplication_id()) {
entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id());
}

if (requestEntry.Hasmessage_group_id()) {
entry->SetMessageGroupId(requestEntry.Getmessage_group_id());
}

entry->SetMessageBody(requestEntry.Getmessage_body());
}
return result;
Expand Down Expand Up @@ -935,7 +980,9 @@ namespace NKikimr::NYmq::V1 {
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
entry->SetId(requestEntry.Getid());
entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout());
if (requestEntry.Hasvisibility_timeout()) {
entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout());
}
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
}
return result;
Expand Down
Loading