Skip to content

Commit

Permalink
[LONGHAUL] Use connection state change to reconnect the AMQP service …
Browse files Browse the repository at this point in the history
…connection (#1917)

* [LONGHAUL] Use connection state change to reconnect the AMQP service connection

* PR feedback
  • Loading branch information
ericwolz authored Mar 24, 2021
1 parent 08416ca commit eb4516a
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions testtools/iothub_test/src/iothubtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const char* AMQP_SEND_AUTHCID_FMT = "iothubowner@sas.root.%s";
#define MAX_DRAIN_TIME 1000.0
#define MAX_SHORT_VALUE 32767 /* maximum (signed) short value */
#define INDEFINITE_TIME ((time_t)-1)
#define CONNECTION_2_MIN_TIMEOUT (2 * 60 * 1000)

MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(IOTHUB_TEST_CLIENT_RESULT, IOTHUB_TEST_CLIENT_RESULT_VALUES);

Expand Down Expand Up @@ -671,7 +672,7 @@ static AMQP_VALUE on_message_received_new(const void* context, MESSAGE_HANDLE me
AMQP_VALUE result;

IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)context;

if (devhubValInfo->onMessageReceivedCallback == NULL)
{
result = messaging_delivery_released();
Expand Down Expand Up @@ -925,14 +926,15 @@ static AMQP_VALUE create_link_source(char* receive_address, filter_set filter_se
return result;
}

static void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
static void on_connection_state_changed(void* context, CONNECTION_STATE new_connection_state, CONNECTION_STATE previous_connection_state)
{
(void)previous_state;
if (new_state == MESSAGE_RECEIVER_STATE_ERROR)
IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)context;
if (devhubValInfo->isEventListenerConnected &&
(new_connection_state == CONNECTION_STATE_END || new_connection_state == CONNECTION_STATE_ERROR) &&
(previous_connection_state == CONNECTION_STATE_OPENED))
{
IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*) context;
devhubValInfo->isEventListenerConnected = false;
LogInfo("Message receiver state: MESSAGE_RECEIVER_STATE_ERROR");
LogInfo("AMQP connection state: CONNECTION_STATE_END");
}
}

Expand Down Expand Up @@ -983,25 +985,25 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf
if ((sasl_plain_interface_description = saslplain_get_interface()) == NULL)
{
LogError("Failed getting saslplain_get_interface.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if ((result->sasl_mechanism = saslmechanism_create(sasl_plain_interface_description, &sasl_plain_config)) == NULL)
{
LogError("Failed creating sasl PLAN mechanism.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if ((tlsio_interface = platform_get_default_tlsio()) == NULL)
{
LogError("Failed getting default TLS IO interface.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if ((result->tls_io = xio_create(tlsio_interface, &tls_io_config)) == NULL)
{
LogError("Failed creating the TLS IO.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else
Expand All @@ -1013,27 +1015,33 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf
if ((result->sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_io_config)) == NULL)
{
LogError("Failed creating the SASL IO.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
/* create the connection, session and link */
else if ((result->connection = connection_create(result->sasl_io, eh_hostname, "e2etest_link", NULL, NULL)) == NULL)
else if ((result->connection = connection_create2(result->sasl_io, eh_hostname, "e2etest_link", NULL, NULL, on_connection_state_changed, devhubValInfo, NULL, NULL)) == NULL)
{
LogError("Failed creating the connection.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if (connection_set_idle_timeout(result->connection, CONNECTION_2_MIN_TIMEOUT))
{
LogError("Failed setting the idle timeout.");
destroyAmqpConnection(result);
result = NULL;
}
else if ((result->session = session_create(result->connection, NULL, NULL)) == NULL)
{
LogError("Failed creating the session.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if (session_set_incoming_window(result->session, 100) != 0)
{
/* set incoming window to 100 for the session */
LogError("Failed setting the session incoming window.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else
Expand All @@ -1044,7 +1052,7 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf
if ((filter_set = create_link_source_filter(receiveTimeRangeStart)) == NULL)
{
LogError("Failed creating filter set with enqueuedtimeutc filter.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else
Expand All @@ -1054,7 +1062,7 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf
if ((source = create_link_source(receive_address, filter_set)) == NULL)
{
LogError("Failed creating source for link.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else
Expand All @@ -1064,33 +1072,33 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf
if ((target = messaging_create_target(receive_address)) == NULL)
{
LogError("Failed creating target for link.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else
{
if ((result->receive_link = link_create(result->session, "receiver-link", role_receiver, source, target)) == NULL)
{
LogError("Failed creating link.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if (link_set_rcv_settle_mode(result->receive_link, receiver_settle_mode_first) != 0)
{
LogError("Failed setting link receive settle mode.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if ((result->message_receiver = messagereceiver_create(result->receive_link, on_message_receiver_state_changed, devhubValInfo)) == NULL)
else if ((result->message_receiver = messagereceiver_create(result->receive_link, NULL, NULL)) == NULL)
{
LogError("Failed creating message receiver.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}
else if (messagereceiver_open(result->message_receiver, on_message_received_new, devhubValInfo) != 0)
{
LogError("Failed opening message receiver.");
destroyAmqpConnection(devhubValInfo->amqp_connection);
destroyAmqpConnection(result);
result = NULL;
}

Expand Down

0 comments on commit eb4516a

Please sign in to comment.