From f3a34c21ab6018074607fc3cb3ac3701314bee67 Mon Sep 17 00:00:00 2001 From: Mushegh Malkhasyan <33669149+MushMal@users.noreply.github.com> Date: Tue, 2 Feb 2021 08:05:06 -0800 Subject: [PATCH] Removing pending queue if session creation fails (#1061) * Removing pending queue if session creation fails * Doing a more thorough job in tracking and cleaning --- samples/Common.c | 197 ++++++++++++++++++++++++++++++-------- samples/Samples.h | 15 ++- src/source/Ice/IceAgent.c | 14 ++- 3 files changed, 178 insertions(+), 48 deletions(-) diff --git a/samples/Common.c b/samples/Common.c index d1618cba66..100415d029 100644 --- a/samples/Common.c +++ b/samples/Common.c @@ -699,8 +699,7 @@ STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE pSampleConfiguration->iceUriCount = 0; - CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH, - &pSampleConfiguration->pPendingSignalingMessageForRemoteClient)); + CHK_STATUS(stackQueueCreate(&pSampleConfiguration->pPendingSignalingMessageForRemoteClient)); CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH, &pSampleConfiguration->pRtcPeerConnectionForRemoteClient)); @@ -838,30 +837,34 @@ STATUS getIceCandidatePairStatsCallback(UINT32 timerId, UINT64 currentTime, UINT return retStatus; } -STATUS freePendingSignalingMessageQueue(UINT64 customData, PHashEntry pHashEntry) -{ - UNUSED_PARAM(customData); - PStackQueue pStackQueue = (PStackQueue) pHashEntry->value; - stackQueueClear(pStackQueue, TRUE); - stackQueueFree(pStackQueue); - return STATUS_SUCCESS; -} - STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; PSampleConfiguration pSampleConfiguration; UINT32 i; + UINT64 data; + StackQueueIterator iterator; BOOL locked = FALSE; CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG); pSampleConfiguration = *ppSampleConfiguration; CHK(pSampleConfiguration != NULL, retStatus); - hashTableIterateEntries(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) NULL, freePendingSignalingMessageQueue); - hashTableClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient); - hashTableFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient); + + if (pSampleConfiguration->pPendingSignalingMessageForRemoteClient != NULL) { + // Iterate and free all the pending queues + stackQueueGetIterator(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, &iterator); + while (IS_VALID_ITERATOR(iterator)) { + stackQueueIteratorGetItem(iterator, &data); + stackQueueIteratorNext(&iterator); + freeMessageQueue((PPendingMessageQueue) data); + } + + stackQueueClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, FALSE); + stackQueueFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient); + pSampleConfiguration->pPendingSignalingMessageForRemoteClient = NULL; + } hashTableClear(pSampleConfiguration->pRtcPeerConnectionForRemoteClient); hashTableFree(pSampleConfiguration->pRtcPeerConnectionForRemoteClient); @@ -989,6 +992,9 @@ STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration) } } + // Check if any lingering pending message queues + CHK_STATUS(removeExpiredMessageQueues(pSampleConfiguration->pPendingSignalingMessageForRemoteClient)); + // periodically wake up and clean up terminated streaming session CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, SAMPLE_SESSION_CLEANUP_WAIT_PERIOD); MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); @@ -1007,18 +1013,20 @@ STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration) return retStatus; } -STATUS submitPendingIceCandidate(PStackQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession) +STATUS submitPendingIceCandidate(PPendingMessageQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession) { STATUS retStatus = STATUS_SUCCESS; BOOL noPendingSignalingMessageForClient = FALSE; PReceivedSignalingMessage pReceivedSignalingMessage = NULL; UINT64 hashValue; + CHK(pPendingMessageQueue != NULL && pPendingMessageQueue->messageQueue != NULL && pSampleStreamingSession != NULL, STATUS_NULL_ARG); + do { - CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue, &noPendingSignalingMessageForClient)); + CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue->messageQueue, &noPendingSignalingMessageForClient)); if (!noPendingSignalingMessageForClient) { hashValue = 0; - CHK_STATUS(stackQueueDequeue(pPendingMessageQueue, &hashValue)); + CHK_STATUS(stackQueueDequeue(pPendingMessageQueue->messageQueue, &hashValue)); pReceivedSignalingMessage = (PReceivedSignalingMessage) hashValue; CHK(pReceivedSignalingMessage != NULL, STATUS_INTERNAL_ERROR); if (pReceivedSignalingMessage->signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE) { @@ -1027,7 +1035,8 @@ STATUS submitPendingIceCandidate(PStackQueue pPendingMessageQueue, PSampleStream SAFE_MEMFREE(pReceivedSignalingMessage); } } while (!noPendingSignalingMessageForClient); - CHK_STATUS(stackQueueFree(pPendingMessageQueue)); + + CHK_STATUS(freeMessageQueue(pPendingMessageQueue)); CleanUp: @@ -1044,7 +1053,7 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe BOOL locked = TRUE; UINT32 clientIdHash; UINT64 hashValue = 0; - PStackQueue pPendingMessageQueue = NULL; + PPendingMessageQueue pPendingMessageQueue = NULL; PSampleStreamingSession pSampleStreamingSession = NULL; PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL; @@ -1075,6 +1084,13 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe */ if (pSampleConfiguration->streamingSessionCount == ARRAY_SIZE(pSampleConfiguration->sampleStreamingSessionList)) { DLOGW("Max simultaneous streaming session count reached."); + + // Need to remove the pending queue if any. + // This is a simple optimization as the session cleanup will + // handle the cleanup of pending message queue after a while + CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, + &pPendingMessageQueue)); + CHK(FALSE, retStatus); } CHK_STATUS(createSampleStreamingSession(pSampleConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE, @@ -1088,15 +1104,13 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession)); // If there are any ice candidate messages in the queue for this client id, submit them now. - if (STATUS_SUCCEEDED(hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, &hashValue))) { - pPendingMessageQueue = (PStackQueue) hashValue; - + CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, + &pPendingMessageQueue)); + if (pPendingMessageQueue != NULL) { CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession)); // NULL the pointer to avoid it being freed in the cleanup pPendingMessageQueue = NULL; - - CHK_STATUS(hashTableRemove(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash)); } break; @@ -1111,16 +1125,14 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe CHK_STATUS(handleAnswer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage)); CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession)); - hashValue = 0; // If there are any ice candidate messages in the queue for this client id, submit them now. - if (STATUS_SUCCEEDED(hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, &hashValue))) { - pPendingMessageQueue = (PStackQueue) hashValue; + CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, + &pPendingMessageQueue)); + if (pPendingMessageQueue != NULL) { CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession)); // NULL the pointer to avoid it being freed in the cleanup pPendingMessageQueue = NULL; - - CHK_STATUS(hashTableRemove(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash)); } break; @@ -1130,21 +1142,18 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe * submit the signaling message into the corresponding streaming session. */ if (!peerConnectionFound) { - hashValue = 0; - if (STATUS_HASH_KEY_NOT_PRESENT == - hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, &hashValue)) { - CHK_STATUS(stackQueueCreate(&pPendingMessageQueue)); - CHK_STATUS( - hashTablePut(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, (UINT64) pPendingMessageQueue)); - } else { - pPendingMessageQueue = (PStackQueue) hashValue; + CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, FALSE, + &pPendingMessageQueue)); + if (pPendingMessageQueue == NULL) { + CHK_STATUS(createMessageQueue(clientIdHash, &pPendingMessageQueue)); + CHK_STATUS(stackQueueEnqueue(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) pPendingMessageQueue)); } pReceivedSignalingMessageCopy = (PReceivedSignalingMessage) MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage)); *pReceivedSignalingMessageCopy = *pReceivedSignalingMessage; - CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue, (UINT64) pReceivedSignalingMessageCopy)); + CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue->messageQueue, (UINT64) pReceivedSignalingMessageCopy)); // NULL the pointers to not free any longer pPendingMessageQueue = NULL; @@ -1163,7 +1172,7 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe SAFE_MEMFREE(pReceivedSignalingMessageCopy); if (pPendingMessageQueue != NULL) { - stackQueueFree(pPendingMessageQueue); + freeMessageQueue(pPendingMessageQueue); } if (locked) { @@ -1173,3 +1182,113 @@ STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pRe CHK_LOG_ERR(retStatus); return retStatus; } + +STATUS createMessageQueue(UINT64 hashValue, PPendingMessageQueue* ppPendingMessageQueue) +{ + STATUS retStatus = STATUS_SUCCESS; + PPendingMessageQueue pPendingMessageQueue = NULL; + + CHK(ppPendingMessageQueue != NULL, STATUS_NULL_ARG); + + CHK(NULL != (pPendingMessageQueue = (PPendingMessageQueue) MEMCALLOC(1, SIZEOF(PendingMessageQueue))), STATUS_NOT_ENOUGH_MEMORY); + pPendingMessageQueue->hashValue = hashValue; + pPendingMessageQueue->createTime = GETTIME(); + CHK_STATUS(stackQueueCreate(&pPendingMessageQueue->messageQueue)); + +CleanUp: + + if (STATUS_FAILED(retStatus) && pPendingMessageQueue != NULL) { + freeMessageQueue(pPendingMessageQueue); + pPendingMessageQueue = NULL; + } + + if (ppPendingMessageQueue != NULL) { + *ppPendingMessageQueue = pPendingMessageQueue; + } + + return retStatus; +} + +STATUS freeMessageQueue(PPendingMessageQueue pPendingMessageQueue) +{ + STATUS retStatus = STATUS_SUCCESS; + + // free is idempotent + CHK(pPendingMessageQueue != NULL, retStatus); + + if (pPendingMessageQueue->messageQueue != NULL) { + stackQueueClear(pPendingMessageQueue->messageQueue, TRUE); + stackQueueFree(pPendingMessageQueue->messageQueue); + } + + MEMFREE(pPendingMessageQueue); + +CleanUp: + return retStatus; +} + +STATUS getPendingMessageQueueForHash(PStackQueue pPendingQueue, UINT64 clientHash, BOOL remove, PPendingMessageQueue* ppPendingMessageQueue) +{ + STATUS retStatus = STATUS_SUCCESS; + PPendingMessageQueue pPendingMessageQueue = NULL; + StackQueueIterator iterator; + BOOL iterate = TRUE; + UINT64 data; + + CHK(pPendingQueue != NULL && ppPendingMessageQueue != NULL, STATUS_NULL_ARG); + + CHK_STATUS(stackQueueGetIterator(pPendingQueue, &iterator)); + while (iterate && IS_VALID_ITERATOR(iterator)) { + CHK_STATUS(stackQueueIteratorGetItem(iterator, &data)); + CHK_STATUS(stackQueueIteratorNext(&iterator)); + + pPendingMessageQueue = (PPendingMessageQueue) data; + + if (clientHash == pPendingMessageQueue->hashValue) { + *ppPendingMessageQueue = pPendingMessageQueue; + iterate = FALSE; + + // Check if the item needs to be removed + if (remove) { + // This is OK to do as we are terminating the iterator anyway + CHK_STATUS(stackQueueRemoveItem(pPendingQueue, data)); + } + } + } + +CleanUp: + + return retStatus; +} + +STATUS removeExpiredMessageQueues(PStackQueue pPendingQueue) +{ + STATUS retStatus = STATUS_SUCCESS; + PPendingMessageQueue pPendingMessageQueue = NULL; + UINT32 i, count; + UINT64 data, curTime; + + CHK(pPendingQueue != NULL, STATUS_NULL_ARG); + + curTime = GETTIME(); + CHK_STATUS(stackQueueGetCount(pPendingQueue, &count)); + + // Dequeue and enqueue in order to not break the iterator while removing an item + for (i = 0; i < count; i++) { + CHK_STATUS(stackQueueDequeue(pPendingQueue, &data)); + + // Check for expiry + pPendingMessageQueue = (PPendingMessageQueue) data; + if (pPendingMessageQueue->createTime + SAMPLE_PENDING_MESSAGE_CLEANUP_DURATION < curTime) { + // Message queue has expired and needs to be freed + CHK_STATUS(freeMessageQueue(pPendingMessageQueue)); + } else { + // Enqueue back again as it's still valued + CHK_STATUS(stackQueueEnqueue(pPendingQueue, data)); + } + } + +CleanUp: + + return retStatus; +} diff --git a/samples/Samples.h b/samples/Samples.h index 4a68e4a553..a8d6095269 100644 --- a/samples/Samples.h +++ b/samples/Samples.h @@ -27,6 +27,8 @@ extern "C" { #define SAMPLE_SESSION_CLEANUP_WAIT_PERIOD (5 * HUNDREDS_OF_NANOS_IN_A_SECOND) +#define SAMPLE_PENDING_MESSAGE_CLEANUP_DURATION (20 * HUNDREDS_OF_NANOS_IN_A_SECOND) + #define CA_CERT_PEM_FILE_EXTENSION ".pem" #define FILE_LOGGING_BUFFER_SIZE (100 * 1024) @@ -77,7 +79,7 @@ typedef struct { RtcOnDataChannel onDataChannel; TID signalingProcessor; - PHashTable pPendingSignalingMessageForRemoteClient; + PStackQueue pPendingSignalingMessageForRemoteClient; PHashTable pRtcPeerConnectionForRemoteClient; MUTEX sampleConfigurationObjLock; @@ -97,6 +99,12 @@ typedef struct { MUTEX signalingSendMessageLock; } SampleConfiguration, *PSampleConfiguration; +typedef struct { + UINT64 hashValue; + UINT64 createTime; + PStackQueue messageQueue; +} PendingMessageQueue, *PPendingMessageQueue; + typedef VOID (*StreamSessionShutdownCallback)(UINT64, PSampleStreamingSession); struct __SampleStreamingSession { @@ -155,6 +163,11 @@ STATUS sessionCleanupWait(PSampleConfiguration); STATUS logSignalingClientStats(PSignalingClientMetrics); STATUS logSelectedIceCandidatesInformation(PSampleStreamingSession); STATUS logStartUpLatency(PSampleConfiguration); +STATUS createMessageQueue(UINT64, PPendingMessageQueue*); +STATUS freeMessageQueue(PPendingMessageQueue); +STATUS submitPendingIceCandidate(PPendingMessageQueue, PSampleStreamingSession); +STATUS removeExpiredMessageQueues(PStackQueue); +STATUS getPendingMessageQueueForHash(PStackQueue, UINT64, BOOL, PPendingMessageQueue*); #ifdef __cplusplus } diff --git a/src/source/Ice/IceAgent.c b/src/source/Ice/IceAgent.c index 917f9398eb..0917cc4fac 100644 --- a/src/source/Ice/IceAgent.c +++ b/src/source/Ice/IceAgent.c @@ -36,7 +36,7 @@ STATUS createIceAgent(PCHAR username, PCHAR password, PIceAgentCallbacks pIceAge STATUS_INVALID_ARG); // allocate the entire struct - pIceAgent = (PIceAgent) MEMCALLOC(1, SIZEOF(IceAgent)); + CHK(NULL != (pIceAgent = (PIceAgent) MEMCALLOC(1, SIZEOF(IceAgent))), STATUS_NOT_ENOUGH_MEMORY); STRNCPY(pIceAgent->localUsername, username, MAX_ICE_CONFIG_USER_NAME_LEN); STRNCPY(pIceAgent->localPassword, password, MAX_ICE_CONFIG_CREDENTIAL_LEN); @@ -1689,9 +1689,8 @@ STATUS iceAgentInitSrflxCandidate(PIceAgent pIceAgent) // open up a new socket at host candidate's ip address for server reflex candidate. // the new port will be stored in pNewCandidate->ipAddress.port. And the Ip address will later be updated // with the correct ip address once the STUN response is received. - CHK_STATUS(createSocketConnection(pCandidate->ipAddress.family, KVS_SOCKET_PROTOCOL_UDP, &pCandidate->ipAddress, NULL, - (UINT64) pIceAgent, incomingDataHandler, pIceAgent->kvsRtcConfiguration.sendBufSize, - &pCandidate->pSocketConnection)); + CHK_STATUS(createSocketConnection(pCandidate->ipAddress.family, KVS_SOCKET_PROTOCOL_UDP, &pCandidate->ipAddress, NULL, (UINT64) pIceAgent, + incomingDataHandler, pIceAgent->kvsRtcConfiguration.sendBufSize, &pCandidate->pSocketConnection)); ATOMIC_STORE_BOOL(&pCandidate->pSocketConnection->receiveData, TRUE); // connectionListener will free the pSocketConnection at the end. CHK_STATUS(connectionListenerAddConnection(pIceAgent->pConnectionListener, pCandidate->pSocketConnection)); @@ -2088,12 +2087,11 @@ STATUS iceAgentReadyStateSetup(PIceAgent pIceAgent) pIceAgent->pDataSendingIceCandidatePair = pNominatedAndValidCandidatePair; CHK_STATUS(getIpAddrStr(&pIceAgent->pDataSendingIceCandidatePair->local->ipAddress, ipAddrStr, ARRAY_SIZE(ipAddrStr))); - DLOGD("Selected pair %s_%s, local candidate type: %s. Round trip time %u ms. Local candidate priority: %u, ice candidate pair priority: %" PRIu64, pIceAgent->pDataSendingIceCandidatePair->local->id, - pIceAgent->pDataSendingIceCandidatePair->remote->id, + DLOGD("Selected pair %s_%s, local candidate type: %s. Round trip time %u ms. Local candidate priority: %u, ice candidate pair priority: %" PRIu64, + pIceAgent->pDataSendingIceCandidatePair->local->id, pIceAgent->pDataSendingIceCandidatePair->remote->id, iceAgentGetCandidateTypeStr(pIceAgent->pDataSendingIceCandidatePair->local->iceCandidateType), pIceAgent->pDataSendingIceCandidatePair->roundTripTime / HUNDREDS_OF_NANOS_IN_A_MILLISECOND, - pIceAgent->pDataSendingIceCandidatePair->local->priority, - pIceAgent->pDataSendingIceCandidatePair->priority); + pIceAgent->pDataSendingIceCandidatePair->local->priority, pIceAgent->pDataSendingIceCandidatePair->priority); /* no state timeout for ready state */ pIceAgent->stateEndTime = INVALID_TIMESTAMP_VALUE;