Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset link credit on last transfer legally received #465

Merged
merged 3 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#include "azure_uamqp_c/amqp_frame_codec.h"
#include "azure_uamqp_c/async_operation.h"

#define DEFAULT_LINK_CREDIT 10000
#define DEFAULT_LINK_CREDIT 10
#define RECEIVER_MIN_LINK_CREDIT 1

typedef struct DELIVERY_INSTANCE_TAG
{
Expand Down Expand Up @@ -423,7 +424,7 @@ static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t
bool more;
bool is_error;

if (link_instance->current_link_credit == 0)
if (link_instance->current_link_credit <= RECEIVER_MIN_LINK_CREDIT)
{
link_instance->current_link_credit = link_instance->max_link_credit;
send_flow(link_instance);
Expand Down
245 changes: 238 additions & 7 deletions tests/link_ut/link_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "umock_c/umock_c.h"
#include "umock_c/umock_c_negative_tests.h"
#include "umock_c/umocktypes_bool.h"
#include "umock_c/umocktypes_stdint.h"

static void* my_gballoc_malloc(size_t size)
{
Expand Down Expand Up @@ -70,7 +71,7 @@ static void on_umock_c_error(UMOCK_C_ERROR_CODE error_code)
ASSERT_FAIL("umock_c reported error :%" PRI_MU_ENUM "", MU_ENUM_VALUE(UMOCK_C_ERROR_CODE, error_code));
}

static int umocktypes_copy_bool_ptr(bool** destination, const bool** source)
static int umocktypes_copy_bool_ptr(bool** destination, const bool* source)
{
int result;

Expand All @@ -81,7 +82,7 @@ static int umocktypes_copy_bool_ptr(bool** destination, const bool** source)
}
else
{
*(*destination) = *(*source);
*(*destination) = (*source);

result = 0;
}
Expand Down Expand Up @@ -144,6 +145,9 @@ static int umocktypes_are_equal_bool_ptr(bool** left, bool** right)
return result;
}


// bool FLOW_HANDLE functions

static int umocktypes_copy_FLOW_HANDLE(FLOW_HANDLE* destination, const FLOW_HANDLE* source)
{
int result = 0;
Expand Down Expand Up @@ -194,6 +198,58 @@ static int umocktypes_are_equal_FLOW_HANDLE(FLOW_HANDLE* left, FLOW_HANDLE* righ
return result;
}

// TRANSFER umock functions

static int umocktypes_copy_TRANSFER_HANDLE(TRANSFER_HANDLE* destination, const TRANSFER_HANDLE* source)
{
int result = 0;

*(destination) = *(source);

return result;
}

static void umocktypes_free_TRANSFER_HANDLE(TRANSFER_HANDLE* value)
{
(void)value;
}

static char* umocktypes_stringify_TRANSFER_HANDLE(const TRANSFER_HANDLE* value)
{
char temp_buffer[32];
char* result;
size_t length = sprintf(temp_buffer, "%p", (void*)*value);
if (length < 0)
{
result = NULL;
}
else
{
result = (char*)malloc(length + 1);
if (result != NULL)
{
(void)memcpy(result, temp_buffer, length + 1);
}
}
return result;
}

static int umocktypes_are_equal_TRANSFER_HANDLE(TRANSFER_HANDLE* left, TRANSFER_HANDLE* right)
{
int result;

if (*left == *right)
{
result = 1;
}
else
{
result = 0;
}

return result;
}

static TRANSFER_HANDLE test_on_transfer_received_transfer;
static uint32_t test_on_transfer_received_payload_size;
static unsigned char test_on_transfer_received_payload_bytes[2048];
Expand Down Expand Up @@ -237,13 +293,14 @@ static LINK_HANDLE create_link(role link_role)
return link_create(TEST_SESSION_HANDLE, TEST_LINK_NAME_1, link_role, TEST_LINK_SOURCE, TEST_LINK_TARGET);
}

static int attach_link(LINK_HANDLE link, ON_ENDPOINT_FRAME_RECEIVED* on_frame_received)
static int attach_link(LINK_HANDLE link, ON_ENDPOINT_FRAME_RECEIVED* on_frame_received, ON_SESSION_STATE_CHANGED* on_session_state_changed)
{
umock_c_reset_all_calls();

STRICT_EXPECTED_CALL(session_begin(TEST_SESSION_HANDLE));
STRICT_EXPECTED_CALL(session_start_link_endpoint(TEST_LINK_ENDPOINT, IGNORED_PTR_ARG, IGNORED_PTR_ARG, IGNORED_PTR_ARG, link))
.CaptureArgumentValue_frame_received_callback(on_frame_received);
.CaptureArgumentValue_frame_received_callback(on_frame_received)
.CaptureArgumentValue_on_session_state_changed(on_session_state_changed);

return link_attach(link, test_on_transfer_received, test_on_link_state_changed, test_on_link_flow_on, NULL);
}
Expand All @@ -262,14 +319,26 @@ TEST_SUITE_INITIALIZE(suite_init)
result = umocktypes_bool_register_types();
ASSERT_ARE_EQUAL(int, 0, result, "Failed registering bool types");

result = umocktypes_stdint_register_types();
ASSERT_ARE_EQUAL(int, 0, result, "Failed registering stdint types");

REGISTER_GLOBAL_MOCK_HOOK(gballoc_malloc, my_gballoc_malloc);
REGISTER_GLOBAL_MOCK_HOOK(gballoc_calloc, my_gballoc_calloc);
REGISTER_GLOBAL_MOCK_HOOK(gballoc_realloc, my_gballoc_realloc);
REGISTER_GLOBAL_MOCK_HOOK(gballoc_free, my_gballoc_free);
REGISTER_UMOCK_ALIAS_TYPE(transfer_number, uint32_t);
REGISTER_UMOCK_ALIAS_TYPE(role, bool);
REGISTER_UMOCK_ALIAS_TYPE(delivery_number, uint32_t);
REGISTER_UMOCK_ALIAS_TYPE(sequence_no, uint32_t);
REGISTER_UMOCK_ALIAS_TYPE(handle, uint32_t);
REGISTER_UMOCK_ALIAS_TYPE(sender_settle_mode, uint8_t);
REGISTER_UMOCK_ALIAS_TYPE(receiver_settle_mode, uint8_t);
REGISTER_UMOCK_ALIAS_TYPE(DISPOSITION_HANDLE, void*);
REGISTER_UMOCK_ALIAS_TYPE(AMQP_VALUE, void*);
REGISTER_UMOCK_ALIAS_TYPE(TICK_COUNTER_HANDLE, void*);
REGISTER_UMOCK_ALIAS_TYPE(SINGLYLINKEDLIST_HANDLE, void*);
REGISTER_UMOCK_ALIAS_TYPE(SESSION_HANDLE, void*);
REGISTER_UMOCK_ALIAS_TYPE(ATTACH_HANDLE, void*);
REGISTER_UMOCK_ALIAS_TYPE(LINK_ENDPOINT_HANDLE, void*);
REGISTER_UMOCK_ALIAS_TYPE(ON_LINK_ENDPOINT_DESTROYED_CALLBACK, void*);
REGISTER_UMOCK_ALIAS_TYPE(ON_ENDPOINT_FRAME_RECEIVED, void*);
Expand All @@ -285,7 +354,9 @@ TEST_SUITE_INITIALIZE(suite_init)
REGISTER_GLOBAL_MOCK_RETURNS(session_start_link_endpoint, 0, 1);

REGISTER_TYPE(FLOW_HANDLE, FLOW_HANDLE);
REGISTER_TYPE(TRANSFER_HANDLE, TRANSFER_HANDLE);
REGISTER_TYPE(bool*, bool_ptr);
REGISTER_TYPE(uint32_t, uint32_t);
}

TEST_SUITE_CLEANUP(suite_cleanup)
Expand Down Expand Up @@ -367,7 +438,8 @@ TEST_FUNCTION(link_receiver_frame_received_succeeds)
// arrange
LINK_HANDLE link = create_link(role_receiver);
ON_ENDPOINT_FRAME_RECEIVED on_frame_received = NULL;
int attach_result = attach_link(link, &on_frame_received);
ON_SESSION_STATE_CHANGED on_session_state_changed = NULL;
int attach_result = attach_link(link, &on_frame_received, &on_session_state_changed);
ASSERT_ARE_EQUAL(int, 0, attach_result);

AMQP_VALUE performative = (AMQP_VALUE)0x5000;
Expand Down Expand Up @@ -402,7 +474,8 @@ TEST_FUNCTION(link_sender_frame_received_succeeds)
// arrange
LINK_HANDLE link = create_link(role_sender);
ON_ENDPOINT_FRAME_RECEIVED on_frame_received = NULL;
int attach_result = attach_link(link, &on_frame_received);
ON_SESSION_STATE_CHANGED on_session_state_changed = NULL;
int attach_result = attach_link(link, &on_frame_received, &on_session_state_changed);
ASSERT_ARE_EQUAL(int, 0, attach_result);

AMQP_VALUE performative = (AMQP_VALUE)0x5000;
Expand Down Expand Up @@ -443,7 +516,8 @@ TEST_FUNCTION(link_receiver_frame_received_get_flow_fails_no_double_free_fails)
// arrange
LINK_HANDLE link = create_link(role_receiver);
ON_ENDPOINT_FRAME_RECEIVED on_frame_received = NULL;
int attach_result = attach_link(link, &on_frame_received);
ON_SESSION_STATE_CHANGED on_session_state_changed = NULL;
int attach_result = attach_link(link, &on_frame_received, &on_session_state_changed);
ASSERT_ARE_EQUAL(int, 0, attach_result);

AMQP_VALUE performative = (AMQP_VALUE)0x5000;
Expand Down Expand Up @@ -473,5 +547,162 @@ TEST_FUNCTION(link_receiver_frame_received_get_flow_fails_no_double_free_fails)
link_destroy(link);
}

TEST_FUNCTION(link_receiver_link_credit_replenish_succeeds)
{
// arrange
LINK_HANDLE link = create_link(role_receiver);
ASSERT_ARE_EQUAL(int, 0, link_set_max_link_credit(link, 5));

ON_ENDPOINT_FRAME_RECEIVED on_frame_received = NULL;
ON_SESSION_STATE_CHANGED on_session_state_changed = NULL;
int attach_result = attach_link(link, &on_frame_received, &on_session_state_changed);
ASSERT_ARE_EQUAL(int, 0, attach_result);

ATTACH_HANDLE attach = (ATTACH_HANDLE)0x4999;
AMQP_VALUE performative = (AMQP_VALUE)0x5000;
AMQP_VALUE descriptor = (AMQP_VALUE)0x5001;
FLOW_HANDLE flow = (FLOW_HANDLE)0x5002;
TRANSFER_HANDLE transfer = (TRANSFER_HANDLE)0x5003;
uint32_t frame_payload_size = 30;
const unsigned char payload_bytes[30] = { 0 };
bool more = false;
DISPOSITION_HANDLE disposition = (DISPOSITION_HANDLE)0x5005;

umock_c_reset_all_calls();
STRICT_EXPECTED_CALL(attach_create(IGNORED_PTR_ARG, IGNORED_NUM_ARG, IGNORED_NUM_ARG))
.SetReturn(attach);
STRICT_EXPECTED_CALL(attach_set_snd_settle_mode(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(attach_set_rcv_settle_mode(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(attach_set_role(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(attach_set_source(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(attach_set_target(IGNORED_PTR_ARG, IGNORED_PTR_ARG));

STRICT_EXPECTED_CALL(attach_set_max_message_size(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(session_send_attach(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(attach_destroy(IGNORED_PTR_ARG));

on_session_state_changed(link, SESSION_STATE_MAPPED, SESSION_STATE_UNMAPPED);
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());

umock_c_reset_all_calls();
STRICT_EXPECTED_CALL(amqpvalue_get_inplace_descriptor(performative))
.SetReturn(descriptor);
STRICT_EXPECTED_CALL(is_attach_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_flow_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(1);
STRICT_EXPECTED_CALL(amqpvalue_get_flow(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &flow, sizeof(flow));
STRICT_EXPECTED_CALL(flow_destroy(IGNORED_PTR_ARG));
on_frame_received(link, performative, frame_payload_size, payload_bytes);
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());

umock_c_reset_all_calls();
// First transfer results in FLOW to set link credit.
STRICT_EXPECTED_CALL(amqpvalue_get_inplace_descriptor(performative))
.SetReturn(descriptor);
STRICT_EXPECTED_CALL(is_attach_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_flow_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_transfer_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(true);
STRICT_EXPECTED_CALL(amqpvalue_get_transfer(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &transfer, sizeof(transfer));
// send_flow
STRICT_EXPECTED_CALL(flow_create(IGNORED_NUM_ARG, IGNORED_NUM_ARG, IGNORED_NUM_ARG))
.SetReturn(flow);
STRICT_EXPECTED_CALL(flow_set_link_credit(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(flow_set_handle(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(flow_set_delivery_count(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(session_send_flow(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(flow_destroy(IGNORED_PTR_ARG));
// continue processing TRANSFER
STRICT_EXPECTED_CALL(transfer_get_more(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &more, sizeof(bool));
STRICT_EXPECTED_CALL(transfer_get_delivery_id(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(disposition_create(IGNORED_NUM_ARG, IGNORED_NUM_ARG))
.SetReturn(disposition);
STRICT_EXPECTED_CALL(disposition_set_last(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(disposition_set_settled(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(disposition_set_state(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(session_send_disposition(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(disposition_destroy(IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(amqpvalue_destroy(IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(transfer_destroy(IGNORED_PTR_ARG));
on_frame_received(link, performative, frame_payload_size, payload_bytes);
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());

umock_c_reset_all_calls();

// act
// First 3 transfers result in no FLOW.
for (int i = 0; i < 3; i++)
{
STRICT_EXPECTED_CALL(amqpvalue_get_inplace_descriptor(performative))
.SetReturn(descriptor);
STRICT_EXPECTED_CALL(is_attach_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_flow_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_transfer_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(true);
STRICT_EXPECTED_CALL(amqpvalue_get_transfer(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &transfer, sizeof(transfer));
STRICT_EXPECTED_CALL(transfer_get_more(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &more, sizeof(bool));
STRICT_EXPECTED_CALL(transfer_get_delivery_id(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(disposition_create(IGNORED_NUM_ARG, IGNORED_NUM_ARG))
.SetReturn(disposition);
STRICT_EXPECTED_CALL(disposition_set_last(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(disposition_set_settled(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(disposition_set_state(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(session_send_disposition(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(disposition_destroy(IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(amqpvalue_destroy(IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(transfer_destroy(IGNORED_PTR_ARG));
on_frame_received(link, performative, frame_payload_size, payload_bytes);
}

// And 4th does result in FLOW.
STRICT_EXPECTED_CALL(amqpvalue_get_inplace_descriptor(performative))
.SetReturn(descriptor);
STRICT_EXPECTED_CALL(is_attach_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_flow_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(false);
STRICT_EXPECTED_CALL(is_transfer_type_by_descriptor(IGNORED_PTR_ARG))
.SetReturn(true);
STRICT_EXPECTED_CALL(amqpvalue_get_transfer(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &transfer, sizeof(transfer));
// send_flow
STRICT_EXPECTED_CALL(flow_create(IGNORED_NUM_ARG, IGNORED_NUM_ARG, IGNORED_NUM_ARG))
.SetReturn(flow);
STRICT_EXPECTED_CALL(flow_set_link_credit(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(flow_set_handle(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(flow_set_delivery_count(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(session_send_flow(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(flow_destroy(IGNORED_PTR_ARG));
// continue processing TRANSFER
STRICT_EXPECTED_CALL(transfer_get_more(IGNORED_PTR_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &more, sizeof(bool));
STRICT_EXPECTED_CALL(transfer_get_delivery_id(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(disposition_create(IGNORED_NUM_ARG, IGNORED_NUM_ARG))
.SetReturn(disposition);
STRICT_EXPECTED_CALL(disposition_set_last(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(disposition_set_settled(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
STRICT_EXPECTED_CALL(disposition_set_state(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(session_send_disposition(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(disposition_destroy(IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(amqpvalue_destroy(IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(transfer_destroy(IGNORED_PTR_ARG));
on_frame_received(link, performative, frame_payload_size, payload_bytes);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());

// cleanup
link_destroy(link);
}

END_TEST_SUITE(link_ut)