Skip to content

Commit

Permalink
Removing pending queue if session creation fails (#1061)
Browse files Browse the repository at this point in the history
* Removing pending queue if session creation fails

* Doing a more thorough job in tracking and cleaning
  • Loading branch information
MushMal authored Feb 2, 2021
1 parent dcb8f80 commit f3a34c2
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 48 deletions.
197 changes: 158 additions & 39 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,7 @@ STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE

pSampleConfiguration->iceUriCount = 0;

CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH,
&pSampleConfiguration->pPendingSignalingMessageForRemoteClient));
CHK_STATUS(stackQueueCreate(&pSampleConfiguration->pPendingSignalingMessageForRemoteClient));
CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH,
&pSampleConfiguration->pRtcPeerConnectionForRemoteClient));

Expand Down Expand Up @@ -838,30 +837,34 @@ STATUS getIceCandidatePairStatsCallback(UINT32 timerId, UINT64 currentTime, UINT
return retStatus;
}

STATUS freePendingSignalingMessageQueue(UINT64 customData, PHashEntry pHashEntry)
{
UNUSED_PARAM(customData);
PStackQueue pStackQueue = (PStackQueue) pHashEntry->value;
stackQueueClear(pStackQueue, TRUE);
stackQueueFree(pStackQueue);
return STATUS_SUCCESS;
}

STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration;
UINT32 i;
UINT64 data;
StackQueueIterator iterator;
BOOL locked = FALSE;

CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG);
pSampleConfiguration = *ppSampleConfiguration;

CHK(pSampleConfiguration != NULL, retStatus);
hashTableIterateEntries(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) NULL, freePendingSignalingMessageQueue);
hashTableClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient);
hashTableFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient);

if (pSampleConfiguration->pPendingSignalingMessageForRemoteClient != NULL) {
// Iterate and free all the pending queues
stackQueueGetIterator(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, &iterator);
while (IS_VALID_ITERATOR(iterator)) {
stackQueueIteratorGetItem(iterator, &data);
stackQueueIteratorNext(&iterator);
freeMessageQueue((PPendingMessageQueue) data);
}

stackQueueClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, FALSE);
stackQueueFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient);
pSampleConfiguration->pPendingSignalingMessageForRemoteClient = NULL;
}

hashTableClear(pSampleConfiguration->pRtcPeerConnectionForRemoteClient);
hashTableFree(pSampleConfiguration->pRtcPeerConnectionForRemoteClient);
Expand Down Expand Up @@ -989,6 +992,9 @@ STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration)
}
}

// Check if any lingering pending message queues
CHK_STATUS(removeExpiredMessageQueues(pSampleConfiguration->pPendingSignalingMessageForRemoteClient));

// periodically wake up and clean up terminated streaming session
CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, SAMPLE_SESSION_CLEANUP_WAIT_PERIOD);
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
Expand All @@ -1007,18 +1013,20 @@ STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration)
return retStatus;
}

STATUS submitPendingIceCandidate(PStackQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession)
STATUS submitPendingIceCandidate(PPendingMessageQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
BOOL noPendingSignalingMessageForClient = FALSE;
PReceivedSignalingMessage pReceivedSignalingMessage = NULL;
UINT64 hashValue;

CHK(pPendingMessageQueue != NULL && pPendingMessageQueue->messageQueue != NULL && pSampleStreamingSession != NULL, STATUS_NULL_ARG);

do {
CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue, &noPendingSignalingMessageForClient));
CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue->messageQueue, &noPendingSignalingMessageForClient));
if (!noPendingSignalingMessageForClient) {
hashValue = 0;
CHK_STATUS(stackQueueDequeue(pPendingMessageQueue, &hashValue));
CHK_STATUS(stackQueueDequeue(pPendingMessageQueue->messageQueue, &hashValue));
pReceivedSignalingMessage = (PReceivedSignalingMessage) hashValue;
CHK(pReceivedSignalingMessage != NULL, STATUS_INTERNAL_ERROR);
if (pReceivedSignalingMessage->signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE) {
Expand All @@ -1027,7 +1035,8 @@ STATUS submitPendingIceCandidate(PStackQueue pPendingMessageQueue, PSampleStream
SAFE_MEMFREE(pReceivedSignalingMessage);
}
} while (!noPendingSignalingMessageForClient);
CHK_STATUS(stackQueueFree(pPendingMessageQueue));

CHK_STATUS(freeMessageQueue(pPendingMessageQueue));

CleanUp:

Expand All @@ -1044,7 +1053,7 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe
BOOL locked = TRUE;
UINT32 clientIdHash;
UINT64 hashValue = 0;
PStackQueue pPendingMessageQueue = NULL;
PPendingMessageQueue pPendingMessageQueue = NULL;
PSampleStreamingSession pSampleStreamingSession = NULL;
PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL;

Expand Down Expand Up @@ -1075,6 +1084,13 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe
*/
if (pSampleConfiguration->streamingSessionCount == ARRAY_SIZE(pSampleConfiguration->sampleStreamingSessionList)) {
DLOGW("Max simultaneous streaming session count reached.");

// Need to remove the pending queue if any.
// This is a simple optimization as the session cleanup will
// handle the cleanup of pending message queue after a while
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE,
&pPendingMessageQueue));

CHK(FALSE, retStatus);
}
CHK_STATUS(createSampleStreamingSession(pSampleConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE,
Expand All @@ -1088,15 +1104,13 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe
CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession));

// If there are any ice candidate messages in the queue for this client id, submit them now.
if (STATUS_SUCCEEDED(hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, &hashValue))) {
pPendingMessageQueue = (PStackQueue) hashValue;

CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE,
&pPendingMessageQueue));
if (pPendingMessageQueue != NULL) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession));

// NULL the pointer to avoid it being freed in the cleanup
pPendingMessageQueue = NULL;

CHK_STATUS(hashTableRemove(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash));
}
break;

Expand All @@ -1111,16 +1125,14 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe
CHK_STATUS(handleAnswer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession));

hashValue = 0;
// If there are any ice candidate messages in the queue for this client id, submit them now.
if (STATUS_SUCCEEDED(hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, &hashValue))) {
pPendingMessageQueue = (PStackQueue) hashValue;
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE,
&pPendingMessageQueue));
if (pPendingMessageQueue != NULL) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession));

// NULL the pointer to avoid it being freed in the cleanup
pPendingMessageQueue = NULL;

CHK_STATUS(hashTableRemove(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash));
}
break;

Expand All @@ -1130,21 +1142,18 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe
* submit the signaling message into the corresponding streaming session.
*/
if (!peerConnectionFound) {
hashValue = 0;
if (STATUS_HASH_KEY_NOT_PRESENT ==
hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, &hashValue)) {
CHK_STATUS(stackQueueCreate(&pPendingMessageQueue));
CHK_STATUS(
hashTablePut(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, (UINT64) pPendingMessageQueue));
} else {
pPendingMessageQueue = (PStackQueue) hashValue;
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, FALSE,
&pPendingMessageQueue));
if (pPendingMessageQueue == NULL) {
CHK_STATUS(createMessageQueue(clientIdHash, &pPendingMessageQueue));
CHK_STATUS(stackQueueEnqueue(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) pPendingMessageQueue));
}

pReceivedSignalingMessageCopy = (PReceivedSignalingMessage) MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage));

*pReceivedSignalingMessageCopy = *pReceivedSignalingMessage;

CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue, (UINT64) pReceivedSignalingMessageCopy));
CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue->messageQueue, (UINT64) pReceivedSignalingMessageCopy));

// NULL the pointers to not free any longer
pPendingMessageQueue = NULL;
Expand All @@ -1163,7 +1172,7 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe

SAFE_MEMFREE(pReceivedSignalingMessageCopy);
if (pPendingMessageQueue != NULL) {
stackQueueFree(pPendingMessageQueue);
freeMessageQueue(pPendingMessageQueue);
}

if (locked) {
Expand All @@ -1173,3 +1182,113 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe
CHK_LOG_ERR(retStatus);
return retStatus;
}

STATUS createMessageQueue(UINT64 hashValue, PPendingMessageQueue* ppPendingMessageQueue)
{
STATUS retStatus = STATUS_SUCCESS;
PPendingMessageQueue pPendingMessageQueue = NULL;

CHK(ppPendingMessageQueue != NULL, STATUS_NULL_ARG);

CHK(NULL != (pPendingMessageQueue = (PPendingMessageQueue) MEMCALLOC(1, SIZEOF(PendingMessageQueue))), STATUS_NOT_ENOUGH_MEMORY);
pPendingMessageQueue->hashValue = hashValue;
pPendingMessageQueue->createTime = GETTIME();
CHK_STATUS(stackQueueCreate(&pPendingMessageQueue->messageQueue));

CleanUp:

if (STATUS_FAILED(retStatus) && pPendingMessageQueue != NULL) {
freeMessageQueue(pPendingMessageQueue);
pPendingMessageQueue = NULL;
}

if (ppPendingMessageQueue != NULL) {
*ppPendingMessageQueue = pPendingMessageQueue;
}

return retStatus;
}

STATUS freeMessageQueue(PPendingMessageQueue pPendingMessageQueue)
{
STATUS retStatus = STATUS_SUCCESS;

// free is idempotent
CHK(pPendingMessageQueue != NULL, retStatus);

if (pPendingMessageQueue->messageQueue != NULL) {
stackQueueClear(pPendingMessageQueue->messageQueue, TRUE);
stackQueueFree(pPendingMessageQueue->messageQueue);
}

MEMFREE(pPendingMessageQueue);

CleanUp:
return retStatus;
}

STATUS getPendingMessageQueueForHash(PStackQueue pPendingQueue, UINT64 clientHash, BOOL remove, PPendingMessageQueue* ppPendingMessageQueue)
{
STATUS retStatus = STATUS_SUCCESS;
PPendingMessageQueue pPendingMessageQueue = NULL;
StackQueueIterator iterator;
BOOL iterate = TRUE;
UINT64 data;

CHK(pPendingQueue != NULL && ppPendingMessageQueue != NULL, STATUS_NULL_ARG);

CHK_STATUS(stackQueueGetIterator(pPendingQueue, &iterator));
while (iterate && IS_VALID_ITERATOR(iterator)) {
CHK_STATUS(stackQueueIteratorGetItem(iterator, &data));
CHK_STATUS(stackQueueIteratorNext(&iterator));

pPendingMessageQueue = (PPendingMessageQueue) data;

if (clientHash == pPendingMessageQueue->hashValue) {
*ppPendingMessageQueue = pPendingMessageQueue;
iterate = FALSE;

// Check if the item needs to be removed
if (remove) {
// This is OK to do as we are terminating the iterator anyway
CHK_STATUS(stackQueueRemoveItem(pPendingQueue, data));
}
}
}

CleanUp:

return retStatus;
}

STATUS removeExpiredMessageQueues(PStackQueue pPendingQueue)
{
STATUS retStatus = STATUS_SUCCESS;
PPendingMessageQueue pPendingMessageQueue = NULL;
UINT32 i, count;
UINT64 data, curTime;

CHK(pPendingQueue != NULL, STATUS_NULL_ARG);

curTime = GETTIME();
CHK_STATUS(stackQueueGetCount(pPendingQueue, &count));

// Dequeue and enqueue in order to not break the iterator while removing an item
for (i = 0; i < count; i++) {
CHK_STATUS(stackQueueDequeue(pPendingQueue, &data));

// Check for expiry
pPendingMessageQueue = (PPendingMessageQueue) data;
if (pPendingMessageQueue->createTime + SAMPLE_PENDING_MESSAGE_CLEANUP_DURATION < curTime) {
// Message queue has expired and needs to be freed
CHK_STATUS(freeMessageQueue(pPendingMessageQueue));
} else {
// Enqueue back again as it's still valued
CHK_STATUS(stackQueueEnqueue(pPendingQueue, data));
}
}

CleanUp:

return retStatus;
}
15 changes: 14 additions & 1 deletion samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extern "C" {

#define SAMPLE_SESSION_CLEANUP_WAIT_PERIOD (5 * HUNDREDS_OF_NANOS_IN_A_SECOND)

#define SAMPLE_PENDING_MESSAGE_CLEANUP_DURATION (20 * HUNDREDS_OF_NANOS_IN_A_SECOND)

#define CA_CERT_PEM_FILE_EXTENSION ".pem"

#define FILE_LOGGING_BUFFER_SIZE (100 * 1024)
Expand Down Expand Up @@ -77,7 +79,7 @@ typedef struct {
RtcOnDataChannel onDataChannel;

TID signalingProcessor;
PHashTable pPendingSignalingMessageForRemoteClient;
PStackQueue pPendingSignalingMessageForRemoteClient;
PHashTable pRtcPeerConnectionForRemoteClient;

MUTEX sampleConfigurationObjLock;
Expand All @@ -97,6 +99,12 @@ typedef struct {
MUTEX signalingSendMessageLock;
} SampleConfiguration, *PSampleConfiguration;

typedef struct {
UINT64 hashValue;
UINT64 createTime;
PStackQueue messageQueue;
} PendingMessageQueue, *PPendingMessageQueue;

typedef VOID (*StreamSessionShutdownCallback)(UINT64, PSampleStreamingSession);

struct __SampleStreamingSession {
Expand Down Expand Up @@ -155,6 +163,11 @@ STATUS sessionCleanupWait(PSampleConfiguration);
STATUS logSignalingClientStats(PSignalingClientMetrics);
STATUS logSelectedIceCandidatesInformation(PSampleStreamingSession);
STATUS logStartUpLatency(PSampleConfiguration);
STATUS createMessageQueue(UINT64, PPendingMessageQueue*);
STATUS freeMessageQueue(PPendingMessageQueue);
STATUS submitPendingIceCandidate(PPendingMessageQueue, PSampleStreamingSession);
STATUS removeExpiredMessageQueues(PStackQueue);
STATUS getPendingMessageQueueForHash(PStackQueue, UINT64, BOOL, PPendingMessageQueue*);

#ifdef __cplusplus
}
Expand Down
14 changes: 6 additions & 8 deletions src/source/Ice/IceAgent.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ STATUS createIceAgent(PCHAR username, PCHAR password, PIceAgentCallbacks pIceAge
STATUS_INVALID_ARG);

// allocate the entire struct
pIceAgent = (PIceAgent) MEMCALLOC(1, SIZEOF(IceAgent));
CHK(NULL != (pIceAgent = (PIceAgent) MEMCALLOC(1, SIZEOF(IceAgent))), STATUS_NOT_ENOUGH_MEMORY);
STRNCPY(pIceAgent->localUsername, username, MAX_ICE_CONFIG_USER_NAME_LEN);
STRNCPY(pIceAgent->localPassword, password, MAX_ICE_CONFIG_CREDENTIAL_LEN);

Expand Down Expand Up @@ -1689,9 +1689,8 @@ STATUS iceAgentInitSrflxCandidate(PIceAgent pIceAgent)
// open up a new socket at host candidate's ip address for server reflex candidate.
// the new port will be stored in pNewCandidate->ipAddress.port. And the Ip address will later be updated
// with the correct ip address once the STUN response is received.
CHK_STATUS(createSocketConnection(pCandidate->ipAddress.family, KVS_SOCKET_PROTOCOL_UDP, &pCandidate->ipAddress, NULL,
(UINT64) pIceAgent, incomingDataHandler, pIceAgent->kvsRtcConfiguration.sendBufSize,
&pCandidate->pSocketConnection));
CHK_STATUS(createSocketConnection(pCandidate->ipAddress.family, KVS_SOCKET_PROTOCOL_UDP, &pCandidate->ipAddress, NULL, (UINT64) pIceAgent,
incomingDataHandler, pIceAgent->kvsRtcConfiguration.sendBufSize, &pCandidate->pSocketConnection));
ATOMIC_STORE_BOOL(&pCandidate->pSocketConnection->receiveData, TRUE);
// connectionListener will free the pSocketConnection at the end.
CHK_STATUS(connectionListenerAddConnection(pIceAgent->pConnectionListener, pCandidate->pSocketConnection));
Expand Down Expand Up @@ -2088,12 +2087,11 @@ STATUS iceAgentReadyStateSetup(PIceAgent pIceAgent)

pIceAgent->pDataSendingIceCandidatePair = pNominatedAndValidCandidatePair;
CHK_STATUS(getIpAddrStr(&pIceAgent->pDataSendingIceCandidatePair->local->ipAddress, ipAddrStr, ARRAY_SIZE(ipAddrStr)));
DLOGD("Selected pair %s_%s, local candidate type: %s. Round trip time %u ms. Local candidate priority: %u, ice candidate pair priority: %" PRIu64, pIceAgent->pDataSendingIceCandidatePair->local->id,
pIceAgent->pDataSendingIceCandidatePair->remote->id,
DLOGD("Selected pair %s_%s, local candidate type: %s. Round trip time %u ms. Local candidate priority: %u, ice candidate pair priority: %" PRIu64,
pIceAgent->pDataSendingIceCandidatePair->local->id, pIceAgent->pDataSendingIceCandidatePair->remote->id,
iceAgentGetCandidateTypeStr(pIceAgent->pDataSendingIceCandidatePair->local->iceCandidateType),
pIceAgent->pDataSendingIceCandidatePair->roundTripTime / HUNDREDS_OF_NANOS_IN_A_MILLISECOND,
pIceAgent->pDataSendingIceCandidatePair->local->priority,
pIceAgent->pDataSendingIceCandidatePair->priority);
pIceAgent->pDataSendingIceCandidatePair->local->priority, pIceAgent->pDataSendingIceCandidatePair->priority);

/* no state timeout for ready state */
pIceAgent->stateEndTime = INVALID_TIMESTAMP_VALUE;
Expand Down

0 comments on commit f3a34c2

Please sign in to comment.