Skip to content

Commit

Permalink
Adding delete signaling channel API (#346)
Browse files Browse the repository at this point in the history
* Adding delete signaling channel API
Adding tests and fixing some of the existing tests

* Adding TSAN suppression for the new test path
  • Loading branch information
MushMal authored Apr 14, 2020
1 parent f809a78 commit 909277d
Show file tree
Hide file tree
Showing 13 changed files with 644 additions and 79 deletions.
48 changes: 37 additions & 11 deletions src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ extern "C" {
#define STATUS_SIGNALING_INVALID_MESSAGE_TTL_VALUE STATUS_SIGNALING_BASE + 0x0000002E
#define STATUS_SIGNALING_ICE_CONFIG_REFRESH_FAILED STATUS_SIGNALING_BASE + 0x0000002F
#define STATUS_SIGNALING_RECONNECT_FAILED STATUS_SIGNALING_BASE + 0x00000030
#define STATUS_SIGNALING_DELETE_CALL_FAILED STATUS_SIGNALING_BASE + 0x00000031

/*!@} */
/*===========================================================================================*/
Expand Down Expand Up @@ -532,6 +533,11 @@ extern "C" {
*/
#define SIGNALING_SEND_TIMEOUT (5 * HUNDREDS_OF_NANOS_IN_A_SECOND)

/**
* Default timeout for deleting a channel
*/
#define SIGNALING_DELETE_TIMEOUT (5 * HUNDREDS_OF_NANOS_IN_A_SECOND)

/**
* Default signaling message alive time
*/
Expand Down Expand Up @@ -782,6 +788,8 @@ typedef enum {
//!< we get to this state after ICE refresh
SIGNALING_CLIENT_STATE_CONNECTED, //!< On transitioning to this state, the timeout on the state machine is reset
SIGNALING_CLIENT_STATE_DISCONNECTED, //!< This state transition happens either from connect or connected state
SIGNALING_CLIENT_STATE_DELETE, //!< This state transition happens when the application calls signalingClientDeleteSync API.
SIGNALING_CLIENT_STATE_DELETED, //!< This state transition happens after the channel gets deleted as a result of a signalingClientDeleteSync API. This is a terminal state.
SIGNALING_CLIENT_STATE_MAX_VALUE, //!< This state indicates maximum number of signaling client states
} SIGNALING_CLIENT_STATE, *PSIGNALING_CLIENT_STATE;

Expand Down Expand Up @@ -1557,45 +1565,63 @@ PUBLIC_API STATUS signalingClientGetIceConfigInfoCount(SIGNALING_CLIENT_HANDLE,
* IMPORTANT: The returned pointer to the ICE configuration information object points to internal structures
* and its contents should not be modified.
*
* @param SIGNALING_CLIENT_HANDLE
* @param[in] SIGNALING_CLIENT_HANDLE Signaling client handle
* @param[in] UINT32 Index of the ICE configuration information object to retrieve
* @param[out] PIceConfigInfo The pointer to the ICE configuration information object
*
* @return STATUS code of execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS signalingClientGetIceConfigInfo(SIGNALING_CLIENT_HANDLE, UINT32, PIceConfigInfo*);


/**
* @brief Connects the signaling client to the socket in order to send/receive messages.
*
* NOTE: The call will succeed only when the signaling client is in a ready state.
* @param SIGNALING_CLIENT_HANDLE
*
* @return STATUS function execution status. STATUS_SUCCESS on success
* @param[in] SIGNALING_CLIENT_HANDLE Signaling client handle
*
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS signalingClientConnectSync(SIGNALING_CLIENT_HANDLE);

/*
* Gets the Signaling client current state.
* @brief Gets the Signaling client current state.
*
* @param - SIGNALING_CLIENT_HANDLE - IN - Signaling client handle
* @param - PSIGNALING_CLIENT_STATE - OUT - Current state of the signaling client as an UINT32 enum
* @param[in] SIGNALING_CLIENT_HANDLE Signaling client handle
* @param[out] PSIGNALING_CLIENT_STATE Current state of the signaling client as an UINT32 enum
*
* @return - STATUS code of the execution
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS signalingClientGetCurrentState(SIGNALING_CLIENT_HANDLE, PSIGNALING_CLIENT_STATE);

/*
* Gets a literal string representing a Signaling client state.
*
* @param - SIGNALING_CLIENT_STATE - IN - Signaling client state
* @param - PCHAR* - OUT - Read only string representing the state
* @param[in] SIGNALING_CLIENT_HANDLE Signaling client handle
* @param[out] PCHAR* Read only string representing the state
*
* @return - STATUS code of the execution
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS signalingClientGetStateString(SIGNALING_CLIENT_STATE, PCHAR*);

/**
* @brief Deletes the signaling channel referenced by SIGNALING_CLIENT_HANDLE
*
* NOTE: The function is intended to be used to clean up the backend resources and
* as such should be called at the end of the lifecycle of the signaling channel resource.
* Attempting to connect to the channel or send a message will result in an
* error or an unpredictable results after this call.
*
* NOTE: The call transitions the signaling client state machine to a terminal state
* even if the call fails. The proper handling on success and on an error is to
* free the signaling client.
*
* @param[in] SIGNALING_CLIENT_HANDLE Signaling client handle
*
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS signalingClientDeleteSync(SIGNALING_CLIENT_HANDLE);

#ifdef __cplusplus
}
#endif
Expand Down
24 changes: 24 additions & 0 deletions src/source/Signaling/Client.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ STATUS signalingClientConnectSync(SIGNALING_CLIENT_HANDLE signalingClientHandle)
return retStatus;
}

STATUS signalingClientDeleteSync(SIGNALING_CLIENT_HANDLE signalingClientHandle)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSignalingClient pSignalingClient = FROM_SIGNALING_CLIENT_HANDLE(signalingClientHandle);

DLOGI("Signaling Client Delete Sync");

CHK_STATUS(signalingDeleteSync(pSignalingClient));

CleanUp:

LEAVES();
return retStatus;
}

STATUS signalingClientGetIceConfigInfoCount(SIGNALING_CLIENT_HANDLE signalingClientHandle, PUINT32 pIceConfigCount)
{
ENTERS();
Expand Down Expand Up @@ -193,6 +209,14 @@ STATUS signalingClientGetStateString(SIGNALING_CLIENT_STATE state, PCHAR* ppStat
*ppStateStr = SIGNALING_CLIENT_STATE_DISCONNECTED_STR;
break;

case SIGNALING_CLIENT_STATE_DELETE:
*ppStateStr = SIGNALING_CLIENT_STATE_DELETE_STR;
break;

case SIGNALING_CLIENT_STATE_DELETED:
*ppStateStr = SIGNALING_CLIENT_STATE_DELETED_STR;
break;

case SIGNALING_CLIENT_STATE_MAX_VALUE:
case SIGNALING_CLIENT_STATE_UNKNOWN:
// Explicit fall-through
Expand Down
17 changes: 8 additions & 9 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ STATUS deleteChannelLws(PSignalingClient pSignalingClient, UINT64 time)
CHAR url[MAX_URI_CHAR_LEN + 1];
CHAR paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
PLwsCallInfo pLwsCallInfo = NULL;
SIZE_T result;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
CHK(pSignalingClient->channelDescription.channelArn[0] != '\0', STATUS_INVALID_OPERATION);
Expand Down Expand Up @@ -1167,8 +1168,13 @@ STATUS deleteChannelLws(PSignalingClient pSignalingClient, UINT64 time)
// Set the service call result
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) pLwsCallInfo->callInfo.callResult);

// Early return if we have a non-success result
CHK((SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) == SERVICE_CALL_RESULT_OK, retStatus);
// Early return if we have a non-success result and it's not a resource not found
result = ATOMIC_LOAD(&pSignalingClient->result);
CHK((SERVICE_CALL_RESULT) result == SERVICE_CALL_RESULT_OK ||
(SERVICE_CALL_RESULT) result == SERVICE_CALL_RESOURCE_NOT_FOUND, retStatus);

// Mark the channel as deleted
ATOMIC_STORE_BOOL(&pSignalingClient->deleted, TRUE);

CleanUp:

Expand Down Expand Up @@ -1879,13 +1885,6 @@ STATUS terminateLwsListenerLoop(PSignalingClient pSignalingClient)
terminateConnectionWithStatus(pSignalingClient, SERVICE_CALL_RESULT_OK);
}

if (pSignalingClient->pLwsContext != NULL) {
MUTEX_LOCK(pSignalingClient->lwsSerializerLock);
lws_context_destroy(pSignalingClient->pLwsContext);
pSignalingClient->pLwsContext = NULL;
MUTEX_UNLOCK(pSignalingClient->lwsSerializerLock);
}

CleanUp:

LEAVES();
Expand Down
71 changes: 64 additions & 7 deletions src/source/Signaling/Signaling.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ STATUS createSignalingSync(PSignalingClientInfoInternal pClientInfo, PChannelInf
ATOMIC_STORE_BOOL(&pSignalingClient->clientReady, FALSE);
ATOMIC_STORE_BOOL(&pSignalingClient->shutdown, FALSE);
ATOMIC_STORE_BOOL(&pSignalingClient->connected, FALSE);
ATOMIC_STORE_BOOL(&pSignalingClient->deleting, FALSE);
ATOMIC_STORE_BOOL(&pSignalingClient->deleted, FALSE);

// Add to the signal handler
// signal(SIGINT, lwsSignalHandler);
Expand Down Expand Up @@ -179,13 +181,14 @@ STATUS freeSignaling(PSignalingClient* ppSignalingClient)

ATOMIC_STORE_BOOL(&pSignalingClient->shutdown, TRUE);

timerQueueFree(&pSignalingClient->timerQueueHandle);

// Terminate the listener thread if alive
terminateLwsListenerLoop(pSignalingClient);
terminateOngoingOperations(pSignalingClient);

// Await for the reconnect thread to exit
awaitForThreadTermination(&pSignalingClient->reconnecterTracker, SIGNALING_CLIENT_SHUTDOWN_TIMEOUT);
if (pSignalingClient->pLwsContext != NULL) {
MUTEX_LOCK(pSignalingClient->lwsSerializerLock);
lws_context_destroy(pSignalingClient->pLwsContext);
pSignalingClient->pLwsContext = NULL;
MUTEX_UNLOCK(pSignalingClient->lwsSerializerLock);
}

freeStateMachine(pSignalingClient->pStateMachine);

Expand Down Expand Up @@ -246,11 +249,34 @@ STATUS freeSignaling(PSignalingClient* ppSignalingClient)
return retStatus;
}

STATUS terminateOngoingOperations(PSignalingClient pSignalingClient)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

timerQueueFree(&pSignalingClient->timerQueueHandle);

// Terminate the listener thread if alive
terminateLwsListenerLoop(pSignalingClient);

// Await for the reconnect thread to exit
awaitForThreadTermination(&pSignalingClient->reconnecterTracker, SIGNALING_CLIENT_SHUTDOWN_TIMEOUT);

CleanUp:

CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
}

STATUS signalingSendMessageSync(PSignalingClient pSignalingClient, PSignalingMessage pSignalingMessage)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PCHAR pOfferType;
PCHAR pOfferType = NULL;
BOOL removeFromList = FALSE;

CHK(pSignalingClient != NULL && pSignalingMessage != NULL, STATUS_NULL_ARG);
Expand Down Expand Up @@ -374,6 +400,37 @@ STATUS signalingConnectSync(PSignalingClient pSignalingClient)
return retStatus;
}

STATUS signalingDeleteSync(PSignalingClient pSignalingClient)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

// Check if we are already deleting
CHK (!ATOMIC_LOAD_BOOL(&pSignalingClient->deleted), retStatus);

// Mark as being deleted
ATOMIC_STORE_BOOL(&pSignalingClient->deleting, TRUE);

CHK_STATUS(terminateOngoingOperations(pSignalingClient));

// Set the state directly
setStateMachineCurrentState(pSignalingClient->pStateMachine, SIGNALING_STATE_DELETE);

// Set the time out before execution
pSignalingClient->stepUntil = GETTIME() + SIGNALING_DELETE_TIMEOUT;

CHK_STATUS(stepSignalingStateMachine(pSignalingClient, retStatus));

CleanUp:

CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
}

STATUS validateSignalingCallbacks(PSignalingClient pSignalingClient, PSignalingClientCallbacks pCallbacks)
{
ENTERS();
Expand Down
13 changes: 13 additions & 0 deletions src/source/Signaling/Signaling.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ extern "C" {
#define SIGNALING_CLIENT_STATE_CONNECTING_STR "Connecting"
#define SIGNALING_CLIENT_STATE_CONNECTED_STR "Connected"
#define SIGNALING_CLIENT_STATE_DISCONNECTED_STR "Disconnected"
#define SIGNALING_CLIENT_STATE_DELETE_STR "Delete"
#define SIGNALING_CLIENT_STATE_DELETED_STR "Deleted"

// Error refreshing ICE server configuration string
#define SIGNALING_ICE_CONFIG_REFRESH_ERROR_MSG "Failed refreshing ICE server configuration with status code 0x%08x."
Expand Down Expand Up @@ -82,6 +84,8 @@ typedef struct {
SignalingApiCallHookFunc getIceConfigPostHookFn;
SignalingApiCallHookFunc connectPreHookFn;
SignalingApiCallHookFunc connectPostHookFn;
SignalingApiCallHookFunc deletePreHookFn;
SignalingApiCallHookFunc deletePostHookFn;
} SignalingClientInfoInternal, *PSignalingClientInfoInternal;

/**
Expand Down Expand Up @@ -113,6 +117,12 @@ typedef struct {
// Wss is connected
volatile ATOMIC_BOOL connected;

// The channel is being deleted
volatile ATOMIC_BOOL deleting;

// The channel is deleted
volatile ATOMIC_BOOL deleted;

// Current version of the structure
UINT32 version;

Expand Down Expand Up @@ -221,6 +231,7 @@ STATUS signalingSendMessageSync(PSignalingClient, PSignalingMessage);
STATUS signalingGetIceConfigInfoCout(PSignalingClient, PUINT32);
STATUS signalingGetIceConfigInfo(PSignalingClient, UINT32, PIceConfigInfo*);
STATUS signalingConnectSync(PSignalingClient);
STATUS signalingDeleteSync(PSignalingClient);

STATUS validateSignalingCallbacks(PSignalingClient, PSignalingClientCallbacks);
STATUS validateSignalingClientInfo(PSignalingClient, PSignalingClientInfoInternal);
Expand All @@ -236,6 +247,8 @@ STATUS awaitForThreadTermination(PThreadTracker, UINT64);
STATUS initializeThreadTracker(PThreadTracker);
STATUS uninitializeThreadTracker(PThreadTracker);

STATUS terminateOngoingOperations(PSignalingClient);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 909277d

Please sign in to comment.