From eb4516aa35973a117805edc3f47ef66f7d3e31b6 Mon Sep 17 00:00:00 2001 From: Eric Wolz Date: Wed, 24 Mar 2021 15:49:08 -0700 Subject: [PATCH] [LONGHAUL] Use connection state change to reconnect the AMQP service connection (#1917) * [LONGHAUL] Use connection state change to reconnect the AMQP service connection * PR feedback --- testtools/iothub_test/src/iothubtest.c | 54 +++++++++++++++----------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/testtools/iothub_test/src/iothubtest.c b/testtools/iothub_test/src/iothubtest.c index ab0ae9f234..1b15dc6415 100644 --- a/testtools/iothub_test/src/iothubtest.c +++ b/testtools/iothub_test/src/iothubtest.c @@ -108,6 +108,7 @@ const char* AMQP_SEND_AUTHCID_FMT = "iothubowner@sas.root.%s"; #define MAX_DRAIN_TIME 1000.0 #define MAX_SHORT_VALUE 32767 /* maximum (signed) short value */ #define INDEFINITE_TIME ((time_t)-1) +#define CONNECTION_2_MIN_TIMEOUT (2 * 60 * 1000) MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(IOTHUB_TEST_CLIENT_RESULT, IOTHUB_TEST_CLIENT_RESULT_VALUES); @@ -671,7 +672,7 @@ static AMQP_VALUE on_message_received_new(const void* context, MESSAGE_HANDLE me AMQP_VALUE result; IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)context; - + if (devhubValInfo->onMessageReceivedCallback == NULL) { result = messaging_delivery_released(); @@ -925,14 +926,15 @@ static AMQP_VALUE create_link_source(char* receive_address, filter_set filter_se return result; } -static void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state) +static void on_connection_state_changed(void* context, CONNECTION_STATE new_connection_state, CONNECTION_STATE previous_connection_state) { - (void)previous_state; - if (new_state == MESSAGE_RECEIVER_STATE_ERROR) + IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)context; + if (devhubValInfo->isEventListenerConnected && + (new_connection_state == CONNECTION_STATE_END || new_connection_state == CONNECTION_STATE_ERROR) && + (previous_connection_state == CONNECTION_STATE_OPENED)) { - IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*) context; devhubValInfo->isEventListenerConnected = false; - LogInfo("Message receiver state: MESSAGE_RECEIVER_STATE_ERROR"); + LogInfo("AMQP connection state: CONNECTION_STATE_END"); } } @@ -983,25 +985,25 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf if ((sasl_plain_interface_description = saslplain_get_interface()) == NULL) { LogError("Failed getting saslplain_get_interface."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else if ((result->sasl_mechanism = saslmechanism_create(sasl_plain_interface_description, &sasl_plain_config)) == NULL) { LogError("Failed creating sasl PLAN mechanism."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else if ((tlsio_interface = platform_get_default_tlsio()) == NULL) { LogError("Failed getting default TLS IO interface."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else if ((result->tls_io = xio_create(tlsio_interface, &tls_io_config)) == NULL) { LogError("Failed creating the TLS IO."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else @@ -1013,27 +1015,33 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf if ((result->sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_io_config)) == NULL) { LogError("Failed creating the SASL IO."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } /* create the connection, session and link */ - else if ((result->connection = connection_create(result->sasl_io, eh_hostname, "e2etest_link", NULL, NULL)) == NULL) + else if ((result->connection = connection_create2(result->sasl_io, eh_hostname, "e2etest_link", NULL, NULL, on_connection_state_changed, devhubValInfo, NULL, NULL)) == NULL) { LogError("Failed creating the connection."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); + result = NULL; + } + else if (connection_set_idle_timeout(result->connection, CONNECTION_2_MIN_TIMEOUT)) + { + LogError("Failed setting the idle timeout."); + destroyAmqpConnection(result); result = NULL; } else if ((result->session = session_create(result->connection, NULL, NULL)) == NULL) { LogError("Failed creating the session."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else if (session_set_incoming_window(result->session, 100) != 0) { /* set incoming window to 100 for the session */ LogError("Failed setting the session incoming window."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else @@ -1044,7 +1052,7 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf if ((filter_set = create_link_source_filter(receiveTimeRangeStart)) == NULL) { LogError("Failed creating filter set with enqueuedtimeutc filter."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else @@ -1054,7 +1062,7 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf if ((source = create_link_source(receive_address, filter_set)) == NULL) { LogError("Failed creating source for link."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else @@ -1064,7 +1072,7 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf if ((target = messaging_create_target(receive_address)) == NULL) { LogError("Failed creating target for link."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else @@ -1072,25 +1080,25 @@ static AMQP_CONN_INFO* createAmqpConnection(IOTHUB_VALIDATION_INFO* devhubValInf if ((result->receive_link = link_create(result->session, "receiver-link", role_receiver, source, target)) == NULL) { LogError("Failed creating link."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else if (link_set_rcv_settle_mode(result->receive_link, receiver_settle_mode_first) != 0) { LogError("Failed setting link receive settle mode."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } - else if ((result->message_receiver = messagereceiver_create(result->receive_link, on_message_receiver_state_changed, devhubValInfo)) == NULL) + else if ((result->message_receiver = messagereceiver_create(result->receive_link, NULL, NULL)) == NULL) { LogError("Failed creating message receiver."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; } else if (messagereceiver_open(result->message_receiver, on_message_received_new, devhubValInfo) != 0) { LogError("Failed opening message receiver."); - destroyAmqpConnection(devhubValInfo->amqp_connection); + destroyAmqpConnection(result); result = NULL; }