Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State machine preparation is decoupled from the constructor #1343

Merged
merged 25 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f0c7d45
State machine preparation is decoupled from the constructor (getToken).
jdelapla Dec 3, 2021
10154b6
adding comment in header file
jdelapla Dec 3, 2021
888cf4c
Clang formatting
jdelapla Dec 3, 2021
dbaf0f7
Unit test fix and review comment updated
jdelapla Dec 3, 2021
801de60
Fixed bad auth test edge case
jdelapla Dec 7, 2021
1be1bed
Remove this check, it breaks static credentials
jdelapla Dec 7, 2021
d6ca619
Merge branch 'develop' into state_machine_constructor_decouple
jdelapla Dec 7, 2021
d000afb
kick
jdelapla Dec 7, 2021
4c80afd
Punch
jdelapla Dec 7, 2021
5e0183a
Clang format, even though it makes the state machine's struct look te…
jdelapla Dec 8, 2021
93c22f1
Merge branch 'develop' into state_machine_constructor_decouple
jdelapla Dec 8, 2021
df0c6ad
Merge branch 'develop' into state_machine_constructor_decouple
jdelapla Dec 8, 2021
8697ad7
Adjusted clock skew unit tests to use fetch. Updated fetch API to use…
jdelapla Dec 8, 2021
a8cc3b3
Removing part of this test. It was a forced failure that failed for d…
jdelapla Dec 9, 2021
fed1b18
allow clock skew test to have a timeout on refreshiceconfig once. The…
jdelapla Dec 9, 2021
8e30400
This test has a race condition. Putting a sleep until the author can …
jdelapla Dec 9, 2021
140942c
If the forced timeout on ConnectSync occurs on the CONNECTING_STATE, …
jdelapla Dec 9, 2021
0e2dc9c
* Unit test fix to account for acceptable timeouts when forcing failures
jdelapla Dec 9, 2021
2caa0c7
Continuing the chain of log fixes from PIC
jdelapla Dec 9, 2021
1e9247a
properly match format specifiers
hassanctech Dec 9, 2021
38eafd5
Sentinel value cannot be 0, it horribly ruins all tests where we want…
jdelapla Dec 10, 2021
673b8e4
Merge branch 'state_machine_constructor_decouple' of github.com:awsla…
jdelapla Dec 10, 2021
d32c406
signed and unsigned comparison fixed
jdelapla Dec 10, 2021
e43efa3
|| -> && on null check
jdelapla Dec 10, 2021
df6f93d
missing null check in Fetch
jdelapla Dec 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ VOID onDataChannelMessage(UINT64 customData, PRtcDataChannel pDataChannel, BOOL
// Send a response to the message sent by the viewer
STATUS retStatus = STATUS_SUCCESS;
retStatus = dataChannelSend(pDataChannel, FALSE, (PBYTE) MASTER_DATA_CHANNEL_MESSAGE, STRLEN(MASTER_DATA_CHANNEL_MESSAGE));
if(retStatus != STATUS_SUCCESS) {
if (retStatus != STATUS_SUCCESS) {
DLOGI("[KVS Master] dataChannelSend(): operation returned status code: 0x%08x \n", retStatus);
}
}
Expand Down Expand Up @@ -1047,7 +1047,7 @@ STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration)
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
}

for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
retStatus = gatherIceServerStats(pSampleConfiguration->sampleStreamingSessionList[i]);
if (STATUS_FAILED(retStatus)) {
Expand Down Expand Up @@ -1155,10 +1155,7 @@ STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration)

// Check if we need to re-create the signaling client on-the-fly
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->recreateSignalingClient) &&
STATUS_SUCCEEDED(freeSignalingClient(&pSampleConfiguration->signalingClientHandle)) &&
STATUS_SUCCEEDED(createSignalingClientSync(&pSampleConfiguration->clientInfo, &pSampleConfiguration->channelInfo,
&pSampleConfiguration->signalingClientCallbacks, pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle))) {
STATUS_SUCCEEDED(signalingClientFetchSync(pSampleConfiguration->signalingClientHandle))) {
// Re-set the variable again
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
}
Expand Down
12 changes: 9 additions & 3 deletions samples/kvsWebRTCClientMaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ INT32 main(INT32 argc, CHAR* argv[])
printf("[KVS Master] Signaling client created successfully\n");

// Enable the processing of the messages
retStatus = signalingClientFetchSync(pSampleConfiguration->signalingClientHandle);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Master] signalingClientFetchSync(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}

retStatus = signalingClientConnectSync(pSampleConfiguration->signalingClientHandle);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Master] signalingClientConnectSync(): operation returned status code: 0x%08x \n", retStatus);
Expand Down Expand Up @@ -286,7 +292,7 @@ PVOID sendVideoPackets(PVOID args)

CHK_LOG_ERR(retStatus);

return (PVOID) (ULONG_PTR) retStatus;
return (PVOID)(ULONG_PTR) retStatus;
}

PVOID sendAudioPackets(PVOID args)
Expand Down Expand Up @@ -354,7 +360,7 @@ PVOID sendAudioPackets(PVOID args)

CleanUp:

return (PVOID) (ULONG_PTR) retStatus;
return (PVOID)(ULONG_PTR) retStatus;
}

PVOID sampleReceiveVideoFrame(PVOID args)
Expand All @@ -374,5 +380,5 @@ PVOID sampleReceiveVideoFrame(PVOID args)

CleanUp:

return (PVOID) (ULONG_PTR) retStatus;
return (PVOID)(ULONG_PTR) retStatus;
}
10 changes: 10 additions & 0 deletions src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,16 @@ PUBLIC_API STATUS signalingClientGetIceConfigInfoCount(SIGNALING_CLIENT_HANDLE,
*/
PUBLIC_API STATUS signalingClientGetIceConfigInfo(SIGNALING_CLIENT_HANDLE, UINT32, PIceConfigInfo*);

/**
* @brief Fetches all assets needed to ready the state machine before attempting to connect.
* Can also be used to reallocate missing / expired assets before reconnecting.
*
* @param[in] SIGNALING_CLIENT_HANDLE Signaling client handle
*
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS signalingClientFetchSync(SIGNALING_CLIENT_HANDLE);

/**
* @brief Connects the signaling client to the web socket in order to send/receive messages.
*
Expand Down
19 changes: 18 additions & 1 deletion src/source/Signaling/Client.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ STATUS signalingClientConnectSync(SIGNALING_CLIENT_HANDLE signalingClientHandle)
return retStatus;
}

STATUS signalingClientFetchSync(SIGNALING_CLIENT_HANDLE signalingClientHandle)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSignalingClient pSignalingClient = FROM_SIGNALING_CLIENT_HANDLE(signalingClientHandle);

DLOGV("Signaling Client Fetch Sync");

CHK_STATUS(signalingFetchSync(pSignalingClient));

CleanUp:

SIGNALING_UPDATE_ERROR_COUNT(pSignalingClient, retStatus);
LEAVES();
return retStatus;
}

STATUS signalingClientDisconnectSync(SIGNALING_CLIENT_HANDLE signalingClientHandle)
{
ENTERS();
Expand Down Expand Up @@ -129,7 +146,7 @@ STATUS signalingClientGetIceConfigInfoCount(SIGNALING_CLIENT_HANDLE signalingCli

DLOGV("Signaling Client Get ICE Config Info Count");

CHK_STATUS(signalingGetIceConfigInfoCout(pSignalingClient, pIceConfigCount));
CHK_STATUS(signalingGetIceConfigInfoCount(pSignalingClient, pIceConfigCount));

CleanUp:

Expand Down
44 changes: 32 additions & 12 deletions src/source/Signaling/Signaling.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,8 @@ STATUS createSignalingSync(PSignalingClientInfoInternal pClientInfo, PChannelInf
// Do not force ice config state
ATOMIC_STORE_BOOL(&pSignalingClient->refreshIceConfig, FALSE);

// Prime the state machine
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, pSignalingClient->diagnostics.createTime + SIGNALING_CREATE_TIMEOUT,
SIGNALING_STATE_READY));
// We do not cache token in file system, so we will always have to retrieve one after creating the client.
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_GET_TOKEN));

CleanUp:

Expand Down Expand Up @@ -323,7 +322,7 @@ STATUS signalingSendMessageSync(PSignalingClient pSignalingClient, PSignalingMes
return retStatus;
}

STATUS signalingGetIceConfigInfoCout(PSignalingClient pSignalingClient, PUINT32 pIceConfigCount)
STATUS signalingGetIceConfigInfoCount(PSignalingClient pSignalingClient, PUINT32 pIceConfigCount)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
Expand Down Expand Up @@ -364,6 +363,29 @@ STATUS signalingGetIceConfigInfo(PSignalingClient pSignalingClient, UINT32 index
return retStatus;
}

STATUS signalingFetchSync(PSignalingClient pSignalingClient)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

// Check if we are already not connected
if (ATOMIC_LOAD_BOOL(&pSignalingClient->connected)) {
CHK_STATUS(terminateOngoingOperations(pSignalingClient));
}

// move to the fromGetToken() so we can move to the necessary step
setStateMachineCurrentState(pSignalingClient->pStateMachine, SIGNALING_STATE_GET_TOKEN);
jdelapla marked this conversation as resolved.
Show resolved Hide resolved
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_READY));

CleanUp:

CHK_LOG_ERR(retStatus);
LEAVES();
return retStatus;
}

STATUS signalingConnectSync(PSignalingClient pSignalingClient)
{
ENTERS();
Expand All @@ -373,17 +395,16 @@ STATUS signalingConnectSync(PSignalingClient pSignalingClient)
CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

// Validate the state
CHK_STATUS(acceptStateMachineState(pSignalingClient->pStateMachine,
SIGNALING_STATE_READY | SIGNALING_STATE_CONNECT | SIGNALING_STATE_DISCONNECTED | SIGNALING_STATE_CONNECTED));
CHK_STATUS(acceptSignalingStateMachineState(
pSignalingClient, SIGNALING_STATE_READY | SIGNALING_STATE_CONNECT | SIGNALING_STATE_DISCONNECTED | SIGNALING_STATE_CONNECTED));

// Check if we are already connected
CHK(!ATOMIC_LOAD_BOOL(&pSignalingClient->connected), retStatus);

// Store the signaling state in case we error/timeout so we can re-set it on exit
CHK_STATUS(getStateMachineCurrentState(pSignalingClient->pStateMachine, &pState));

CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT,
SIGNALING_STATE_CONNECTED));
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, GETTIME() + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_CONNECTED));

CleanUp:

Expand Down Expand Up @@ -567,8 +588,8 @@ STATUS refreshIceConfiguration(PSignalingClient pSignalingClient)
CHK(pSignalingClient->iceConfigCount == 0 || curTime > pSignalingClient->iceConfigExpiration, retStatus);

// ICE config can be retrieved in specific states only
CHK_STATUS(acceptStateMachineState(pSignalingClient->pStateMachine,
SIGNALING_STATE_READY | SIGNALING_STATE_CONNECT | SIGNALING_STATE_CONNECTED | SIGNALING_STATE_DISCONNECTED));
CHK_STATUS(acceptSignalingStateMachineState(
pSignalingClient, SIGNALING_STATE_READY | SIGNALING_STATE_CONNECT | SIGNALING_STATE_CONNECTED | SIGNALING_STATE_DISCONNECTED));

MUTEX_LOCK(pSignalingClient->stateLock);
locked = TRUE;
Expand All @@ -581,8 +602,7 @@ STATUS refreshIceConfiguration(PSignalingClient pSignalingClient)

// Iterate the state machinery in steady states only - ready or connected
if (pStateMachineState->state == SIGNALING_STATE_READY || pStateMachineState->state == SIGNALING_STATE_CONNECTED) {
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, curTime + SIGNALING_REFRESH_ICE_CONFIG_STATE_TIMEOUT,
pStateMachineState->state));
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, curTime + SIGNALING_REFRESH_ICE_CONFIG_STATE_TIMEOUT, pStateMachineState->state));
}

CleanUp:
Expand Down
11 changes: 6 additions & 5 deletions src/source/Signaling/Signaling.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ extern "C" {
#define SIGNALING_REQUEST_ID_HEADER_NAME KVS_REQUEST_ID_HEADER_NAME ":"

// Signaling client from custom data conversion
#define SIGNALING_CLIENT_FROM_CUSTOM_DATA(h) ((PSignalingClient) (h))
#define CUSTOM_DATA_FROM_SIGNALING_CLIENT(p) ((UINT64) (p))
#define SIGNALING_CLIENT_FROM_CUSTOM_DATA(h) ((PSignalingClient)(h))
#define CUSTOM_DATA_FROM_SIGNALING_CLIENT(p) ((UINT64)(p))

// Grace period for refreshing the ICE configuration
#define ICE_CONFIGURATION_REFRESH_GRACE_PERIOD (30 * HUNDREDS_OF_NANOS_IN_A_SECOND)
Expand Down Expand Up @@ -282,15 +282,16 @@ typedef struct {
} SignalingClient, *PSignalingClient;

// Public handle to and from object converters
#define TO_SIGNALING_CLIENT_HANDLE(p) ((SIGNALING_CLIENT_HANDLE) (p))
#define FROM_SIGNALING_CLIENT_HANDLE(h) (IS_VALID_SIGNALING_CLIENT_HANDLE(h) ? (PSignalingClient) (h) : NULL)
#define TO_SIGNALING_CLIENT_HANDLE(p) ((SIGNALING_CLIENT_HANDLE)(p))
#define FROM_SIGNALING_CLIENT_HANDLE(h) (IS_VALID_SIGNALING_CLIENT_HANDLE(h) ? (PSignalingClient)(h) : NULL)

STATUS createSignalingSync(PSignalingClientInfoInternal, PChannelInfo, PSignalingClientCallbacks, PAwsCredentialProvider, PSignalingClient*);
STATUS freeSignaling(PSignalingClient*);

STATUS signalingSendMessageSync(PSignalingClient, PSignalingMessage);
STATUS signalingGetIceConfigInfoCout(PSignalingClient, PUINT32);
STATUS signalingGetIceConfigInfoCount(PSignalingClient, PUINT32);
STATUS signalingGetIceConfigInfo(PSignalingClient, UINT32, PIceConfigInfo*);
STATUS signalingFetchSync(PSignalingClient);
STATUS signalingConnectSync(PSignalingClient);
STATUS signalingDisconnectSync(PSignalingClient);
STATUS signalingDeleteSync(PSignalingClient);
Expand Down
23 changes: 12 additions & 11 deletions src/source/Signaling/StateMachine.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ StateMachineState SIGNALING_STATE_MACHINE_STATES[] = {
fromGetTokenSignalingState, executeGetTokenSignalingState, SIGNALING_STATES_DEFAULT_RETRY_COUNT, STATUS_SIGNALING_GET_TOKEN_CALL_FAILED},
{SIGNALING_STATE_DESCRIBE,
SIGNALING_STATE_GET_TOKEN | SIGNALING_STATE_CREATE | SIGNALING_STATE_GET_ENDPOINT | SIGNALING_STATE_GET_ICE_CONFIG | SIGNALING_STATE_CONNECT |
SIGNALING_STATE_CONNECTED | SIGNALING_STATE_DELETE | SIGNALING_STATE_DESCRIBE,
SIGNALING_STATE_CONNECTED | SIGNALING_STATE_DELETE | SIGNALING_STATE_DESCRIBE | SIGNALING_STATE_READY | SIGNALING_STATE_DISCONNECTED,
fromDescribeSignalingState, executeDescribeSignalingState, SIGNALING_STATES_DEFAULT_RETRY_COUNT, STATUS_SIGNALING_DESCRIBE_CALL_FAILED},
{SIGNALING_STATE_CREATE, SIGNALING_STATE_DESCRIBE | SIGNALING_STATE_CREATE, fromCreateSignalingState, executeCreateSignalingState,
SIGNALING_STATES_DEFAULT_RETRY_COUNT, STATUS_SIGNALING_CREATE_CALL_FAILED},
Expand Down Expand Up @@ -60,7 +60,7 @@ STATUS signalingStateMachineIterator(PSignalingClient pSignalingClient, UINT64 e
MUTEX_LOCK(pSignalingClient->stateLock);
locked = TRUE;

while(TRUE) {
while (TRUE) {
CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

CHK(!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown), retStatus);
Expand All @@ -79,8 +79,7 @@ STATUS signalingStateMachineIterator(PSignalingClient pSignalingClient, UINT64 e
// NOTE: Api Gateway might not return an error that can be interpreted as unauthorized to
// make the correct transition to auth integration state.
if (retStatus == STATUS_SERVICE_CALL_NOT_AUTHORIZED_ERROR ||
(SERVICE_CALL_UNKNOWN == (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->result) &&
pSignalingClient->pAwsCredentials->expiration < currentTime)) {
(pSignalingClient->pAwsCredentials != NULL && pSignalingClient->pAwsCredentials->expiration < currentTime)) {
jdelapla marked this conversation as resolved.
Show resolved Hide resolved
// Set the call status as auth error
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) SERVICE_CALL_NOT_AUTHORIZED);
}
Expand Down Expand Up @@ -273,6 +272,11 @@ STATUS executeGetTokenSignalingState(UINT64 customData, UINT64 time)
SIGNALING_CLIENT_STATE_GET_CREDENTIALS));
}

// reset credentials expiration if we already have one.
if (NULL != pSignalingClient->pAwsCredentials) {
pSignalingClient->pAwsCredentials->expiration = 0;
}

// Use the credential provider to get the token
retStatus = pSignalingClient->pCredentialProvider->getCredentialsFn(pSignalingClient->pCredentialProvider, &pSignalingClient->pAwsCredentials);

Expand Down Expand Up @@ -441,7 +445,6 @@ STATUS fromGetEndpointSignalingState(UINT64 customData, PUINT64 pState)
default:
break;
}

*pState = state;

CleanUp:
Expand Down Expand Up @@ -716,10 +719,11 @@ STATUS fromConnectedSignalingState(UINT64 customData, PUINT64 pState)
break;

case SERVICE_CALL_INTERNAL_ERROR:
state = SIGNALING_STATE_GET_ENDPOINT;
break;

case SERVICE_CALL_BAD_REQUEST:
case SERVICE_CALL_NETWORK_CONNECTION_TIMEOUT:
case SERVICE_CALL_NETWORK_READ_TIMEOUT:
case SERVICE_CALL_REQUEST_TIMEOUT:
case SERVICE_CALL_GATEWAY_TIMEOUT:
state = SIGNALING_STATE_GET_ENDPOINT;
break;

Expand Down Expand Up @@ -778,9 +782,6 @@ STATUS fromDisconnectedSignalingState(UINT64 customData, PUINT64 pState)

CHK(pSignalingClient != NULL && pState != NULL, STATUS_NULL_ARG);

// See if we need to retry first of all
CHK(pSignalingClient->pChannelInfo->reconnect, STATUS_SUCCESS);
jdelapla marked this conversation as resolved.
Show resolved Hide resolved

result = ATOMIC_LOAD(&pSignalingClient->result);
switch (result) {
case SERVICE_CALL_FORBIDDEN:
Expand Down
Loading