Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added subscription_error_count reset / AMQP IOTHUB_CLIENT_CONNECTION_QUOTA_EXCEEDED #2267

Merged
merged 14 commits into from
Mar 29, 2022
3 changes: 2 additions & 1 deletion iothub_client/inc/internal/iothubtransport_amqp_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ MU_DEFINE_ENUM_WITHOUT_INVALID(DEVICE_SEND_STATUS, DEVICE_SEND_STATUS_VALUES);
D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING, \
D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_TIMEOUT, \
D2C_EVENT_SEND_COMPLETE_RESULT_DEVICE_DESTROYED, \
D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_UNKNOWN
D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_UNKNOWN, \
D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_QUOTA_EXCEEDED

MU_DEFINE_ENUM_WITHOUT_INVALID(D2C_EVENT_SEND_RESULT, D2C_EVENT_SEND_RESULT_VALUES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ typedef enum TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_TAG
TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE,
TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING,
TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_TIMEOUT,
TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED
TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED,
TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_QUOTA_EXCEEDED
} TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT;

typedef enum TELEMETRY_MESSENGER_DISPOSITION_RESULT_TAG
Expand Down
3 changes: 2 additions & 1 deletion iothub_client/inc/iothub_client_core_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ extern "C"
IOTHUB_CLIENT_CONNECTION_NO_NETWORK, \
IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR, \
IOTHUB_CLIENT_CONNECTION_OK, \
IOTHUB_CLIENT_CONNECTION_NO_PING_RESPONSE \
IOTHUB_CLIENT_CONNECTION_NO_PING_RESPONSE, \
IOTHUB_CLIENT_CONNECTION_QUOTA_EXCEEDED \

/** @brief Enumeration passed to the application callback indicating reason that connection was unsuccessful.
*/
Expand Down
13 changes: 12 additions & 1 deletion iothub_client/src/iothubtransport_amqp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ typedef struct AMQP_TRANSPORT_DEVICE_INSTANCE_TAG
bool subscribe_methods_needed; // Indicates if should subscribe for device methods.
// is the transport subscribed for methods?
bool subscribed_for_methods; // Indicates if device is subscribed for device methods.

bool is_quota_exceeded;
TRANSPORT_CALLBACKS_INFO transport_callbacks;
void* transport_ctx;
} AMQP_TRANSPORT_DEVICE_INSTANCE;
Expand Down Expand Up @@ -292,6 +292,10 @@ static void on_device_state_changed_callback(void* context, DEVICE_STATE previou
{
registered_device->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK, registered_device->transport_ctx);
}
else if (registered_device->is_quota_exceeded)
{
registered_device->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_QUOTA_EXCEEDED, registered_device->transport_ctx);
}
}
else if (new_state == DEVICE_STATE_ERROR_AUTH)
{
Expand Down Expand Up @@ -807,6 +811,7 @@ static void prepare_device_for_connection_retry(AMQP_TRANSPORT_DEVICE_INSTANCE*

registered_device->number_of_previous_failures = 0;
registered_device->number_of_send_event_complete_failures = 0;
registered_device->is_quota_exceeded = false;
}

void prepare_for_connection_retry(AMQP_TRANSPORT_INSTANCE* transport_instance)
Expand Down Expand Up @@ -945,6 +950,11 @@ static void on_event_send_complete(IOTHUB_MESSAGE_LIST* message, D2C_EVENT_SEND_
registered_device->number_of_send_event_complete_failures = 0;
}

if (result == D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_QUOTA_EXCEEDED)
{
registered_device->is_quota_exceeded = true;
}

if (message->callback != NULL)
{
IOTHUB_CLIENT_CONFIRMATION_RESULT iothub_send_result = get_iothub_client_confirmation_result_from(result);
Expand Down Expand Up @@ -2040,6 +2050,7 @@ IOTHUB_DEVICE_HANDLE IoTHubTransport_AMQP_Common_Register(TRANSPORT_LL_HANDLE ha
amqp_device_instance->max_state_change_timeout_secs = DEFAULT_DEVICE_STATE_CHANGE_TIMEOUT_SECS;
amqp_device_instance->subscribe_methods_needed = false;
amqp_device_instance->subscribed_for_methods = false;
amqp_device_instance->is_quota_exceeded = false;
amqp_device_instance->transport_ctx = transport_instance->transport_ctx;
amqp_device_instance->transport_callbacks = transport_instance->transport_callbacks;

Expand Down
3 changes: 3 additions & 0 deletions iothub_client/src/iothubtransport_amqp_device.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ static D2C_EVENT_SEND_RESULT get_d2c_event_send_result_from(TELEMETRY_MESSENGER_
case TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED:
d2c_esr = D2C_EVENT_SEND_COMPLETE_RESULT_DEVICE_DESTROYED;
break;
case TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_QUOTA_EXCEEDED:
d2c_esr = D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_QUOTA_EXCEEDED;
break;
default:
// This is not expected. All states should be mapped.
d2c_esr = D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_UNKNOWN;
Expand Down
45 changes: 43 additions & 2 deletions iothub_client/src/iothubtransport_amqp_telemetry_messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,6 @@ static void invoke_callback(const void* item, const void* action_context, bool*

static void internal_on_event_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_result, AMQP_VALUE delivery_state)
{
(void)delivery_state;
if (context != NULL)
{
MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)context;
Expand All @@ -981,7 +980,49 @@ static void internal_on_event_send_complete_callback(void* context, MESSAGE_SEND
}
else
{
messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING;
bool isResourceLimitExceeded = false;
uint32_t item_count;
int ret = amqpvalue_get_list_item_count(delivery_state, &item_count);
for (uint32_t i = 0; !isResourceLimitExceeded && ret == 0 && i < item_count; i++)
{
AMQP_VALUE delivery_state_item = amqpvalue_get_list_item(delivery_state, i);
if (delivery_state_item != NULL)
{
AMQP_VALUE item_properties = amqpvalue_get_inplace_described_value(delivery_state_item);
if (item_properties != NULL)
{
uint32_t item_properties_count = 0;
ret = amqpvalue_get_list_item_count(item_properties, &item_properties_count);
for (uint32_t t = 0; !isResourceLimitExceeded && ret == 0 && t < item_properties_count; t++)
{
AMQP_VALUE item_property = amqpvalue_get_list_item(item_properties, t);
if (item_property != NULL)
{
const char* symbol_value;
int ret_sym = amqpvalue_get_symbol(item_property, &symbol_value);
if (ret_sym == 0)
{
if (strcmp(symbol_value, "amqp:resource-limit-exceeded") == 0)
{
isResourceLimitExceeded = true;
}
}
amqpvalue_destroy(item_property);
}
}
}
amqpvalue_destroy(delivery_state_item);
}
}

if (isResourceLimitExceeded)
{
messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_QUOTA_EXCEEDED;
}
else
{
messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING;
}
}

// Initially typecast to a size_t to avoid 64 bit compiler warnings on casting of void* to larger type.
Expand Down
1 change: 1 addition & 0 deletions iothub_client/src/iothubtransport_amqp_twin_messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,7 @@ int twin_messenger_stop(TWIN_MESSENGER_HANDLE twin_msgr_handle)
}
else
{
twin_msgr->subscription_error_count = 0;
if (twin_msgr->subscription_state != TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE)
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ void real_free(void* ptr)
#define please_mock_amqpvalue_create_string MOCK_ENABLED
#define please_mock_amqpvalue_create_symbol MOCK_ENABLED
#define please_mock_amqpvalue_destroy MOCK_ENABLED
#define please_mock_amqpvalue_get_symbol MOCK_ENABLED
#define please_mock_amqpvalue_get_list_item_count MOCK_ENABLED
#define please_mock_amqpvalue_get_list_item MOCK_ENABLED
#define please_mock_amqpvalue_get_inplace_described_value MOCK_ENABLED
#define please_mock_amqpvalue_set_map_value MOCK_ENABLED
#define please_mock_link_create MOCK_ENABLED
#define please_mock_link_destroy MOCK_ENABLED
Expand Down Expand Up @@ -1146,8 +1150,16 @@ static void set_expected_calls_free_task(int number_callbacks)
EXPECTED_CALL(free(IGNORED_PTR_ARG));
}

static void set_expected_calls_for_on_message_send_complete(int number_callbacks)
static void set_expected_calls_for_on_message_send_complete(int number_callbacks, MESSAGE_SEND_RESULT message_send_result)
{
if (message_send_result == MESSAGE_SEND_ERROR)
{
uint32_t count = 0;
STRICT_EXPECTED_CALL(amqpvalue_get_list_item_count(IGNORED_NUM_ARG, IGNORED_PTR_ARG))
.CopyOutArgumentBuffer(2, &count, sizeof(count))
.SetReturn(0);
}

STRICT_EXPECTED_CALL(singlylinkedlist_foreach(IGNORED_PTR_ARG, IGNORED_PTR_ARG, IGNORED_PTR_ARG));

STRICT_EXPECTED_CALL(singlylinkedlist_find(TEST_IN_PROGRESS_LIST, IGNORED_PTR_ARG, IGNORED_PTR_ARG))
Expand Down Expand Up @@ -1919,6 +1931,11 @@ TEST_SUITE_INITIALIZE(TestClassInitialize)
REGISTER_GLOBAL_MOCK_RETURN(amqpvalue_set_map_value, 0);
REGISTER_GLOBAL_MOCK_FAIL_RETURN(amqpvalue_set_map_value, 1);

REGISTER_GLOBAL_MOCK_RETURN(amqpvalue_get_symbol, 0);
REGISTER_GLOBAL_MOCK_RETURN(amqpvalue_get_list_item_count, 0);
REGISTER_GLOBAL_MOCK_RETURN(amqpvalue_get_list_item, 0);
REGISTER_GLOBAL_MOCK_RETURN(amqpvalue_get_inplace_described_value, 0);

REGISTER_GLOBAL_MOCK_RETURN(link_set_attach_properties, 0);
REGISTER_GLOBAL_MOCK_FAIL_RETURN(link_set_attach_properties, 1);

Expand Down Expand Up @@ -2757,7 +2774,7 @@ static void test_send_events_for_callbacks(MESSAGE_SEND_RESULT message_send_resu
crank_telemetry_messenger_do_work(handle, mdwp);

umock_c_reset_all_calls();
set_expected_calls_for_on_message_send_complete(test_config->number_test_events);
set_expected_calls_for_on_message_send_complete(test_config->number_test_events, message_send_result);

// act
ASSERT_IS_NOT_NULL(saved_messagesender_send_on_message_send_complete);
Expand Down