Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread cancel memory leak, recreate signaling client & lws_context whenever a significant error has occurred, verbose and debug logging for ice & turn #1641

Merged
merged 8 commits into from
Feb 12, 2023
41 changes: 30 additions & 11 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ VOID sigintHandler(INT32 sigNum)
}
}

STATUS signalingCallFailed(STATUS status)
{
return (STATUS_SIGNALING_GET_TOKEN_CALL_FAILED == status || STATUS_SIGNALING_DESCRIBE_CALL_FAILED == status ||
STATUS_SIGNALING_CREATE_CALL_FAILED == status || STATUS_SIGNALING_GET_ENDPOINT_CALL_FAILED == status ||
STATUS_SIGNALING_GET_ICE_CONFIG_CALL_FAILED == status || STATUS_SIGNALING_CONNECT_CALL_FAILED == status);
}

VOID onDataChannelMessage(UINT64 customData, PRtcDataChannel pDataChannel, BOOL isBinary, PBYTE pMessage, UINT32 pMessageLen)
{
UNUSED_PARAM(customData);
Expand Down Expand Up @@ -152,7 +159,9 @@ PVOID mediaSenderRoutine(PVOID customData)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
TID videoSenderTid = INVALID_TID_VALUE, audioSenderTid = INVALID_TID_VALUE;
CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG);
pSampleConfiguration->videoSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->audioSenderTid = INVALID_TID_VALUE;

MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->connected) && !ATOMIC_LOAD_BOOL(&pSampleConfiguration->appTerminateFlag)) {
Expand All @@ -163,19 +172,19 @@ PVOID mediaSenderRoutine(PVOID customData)
CHK(!ATOMIC_LOAD_BOOL(&pSampleConfiguration->appTerminateFlag), retStatus);

if (pSampleConfiguration->videoSource != NULL) {
THREAD_CREATE(&videoSenderTid, pSampleConfiguration->videoSource, (PVOID) pSampleConfiguration);
THREAD_CREATE(&pSampleConfiguration->videoSenderTid, pSampleConfiguration->videoSource, (PVOID) pSampleConfiguration);
}

if (pSampleConfiguration->audioSource != NULL) {
THREAD_CREATE(&audioSenderTid, pSampleConfiguration->audioSource, (PVOID) pSampleConfiguration);
THREAD_CREATE(&pSampleConfiguration->audioSenderTid, pSampleConfiguration->audioSource, (PVOID) pSampleConfiguration);
}

if (videoSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(videoSenderTid, NULL);
if (pSampleConfiguration->videoSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(pSampleConfiguration->videoSenderTid, NULL);
}

if (audioSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(audioSenderTid, NULL);
if (pSampleConfiguration->audioSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(pSampleConfiguration->audioSenderTid, NULL);
}

CleanUp:
Expand Down Expand Up @@ -752,6 +761,8 @@ STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE
#endif

pSampleConfiguration->mediaSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->audioSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->videoSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->signalingClientHandle = INVALID_SIGNALING_CLIENT_HANDLE_VALUE;
pSampleConfiguration->sampleConfigurationObjLock = MUTEX_CREATE(TRUE);
pSampleConfiguration->cvar = CVAR_CREATE();
Expand Down Expand Up @@ -1166,10 +1177,18 @@ STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration)
}

// Check if we need to re-create the signaling client on-the-fly
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->recreateSignalingClient) &&
STATUS_SUCCEEDED(signalingClientFetchSync(pSampleConfiguration->signalingClientHandle))) {
// Re-set the variable again
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->recreateSignalingClient)) {
retStatus = signalingClientFetchSync(pSampleConfiguration->signalingClientHandle);
if (STATUS_SUCCEEDED(retStatus)) {
// Re-set the variable again
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
} else if (signalingCallFailed(retStatus)) {
printf("[KVS Common] recreating Signaling Client\n");
freeSignalingClient(&pSampleConfiguration->signalingClientHandle);
createSignalingClientSync(&pSampleConfiguration->clientInfo, &pSampleConfiguration->channelInfo,
&pSampleConfiguration->signalingClientCallbacks, pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle);
}
}

// Check the signaling client state and connect if needed
Expand Down
2 changes: 2 additions & 0 deletions samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ typedef struct {
PBYTE pVideoFrameBuffer;
UINT32 videoBufferSize;
TID mediaSenderTid;
TID audioSenderTid;
TID videoSenderTid;
TIMER_QUEUE_HANDLE timerQueueHandle;
UINT32 iceCandidatePairStatsTimerId;
SampleStreamingMediaType mediaType;
Expand Down
20 changes: 4 additions & 16 deletions samples/kvsWebRTCClientMaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,6 @@ INT32 main(INT32 argc, CHAR* argv[])
// Kick of the termination sequence
ATOMIC_STORE_BOOL(&pSampleConfiguration->appTerminateFlag, TRUE);

if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
}

// Cancel the media thread
if (pSampleConfiguration->mediaThreadStarted) {
DLOGD("Canceling media thread");
THREAD_CANCEL(pSampleConfiguration->mediaSenderTid);
}

if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}

if (pSampleConfiguration->mediaSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(pSampleConfiguration->mediaSenderTid, NULL);
}
Expand All @@ -166,6 +152,7 @@ INT32 main(INT32 argc, CHAR* argv[])
}
}
printf("[KVS Master] Cleanup done\n");
CHK_LOG_ERR(retStatus);

RESET_INSTRUMENTED_ALLOCATORS();

Expand Down Expand Up @@ -289,7 +276,7 @@ PVOID sendVideoPackets(PVOID args)
}

CleanUp:

printf("[KVS Master] closing video thread");
CHK_LOG_ERR(retStatus);

return (PVOID) (ULONG_PTR) retStatus;
Expand Down Expand Up @@ -330,6 +317,7 @@ PVOID sendAudioPackets(PVOID args)
printf("[KVS Master] MEMREALLOC(): operation returned status code: 0x%08x \n", STATUS_NOT_ENOUGH_MEMORY);
goto CleanUp;
}
pSampleConfiguration->audioBufferSize = frameSize;
}

frame.frameData = pSampleConfiguration->pAudioFrameBuffer;
Expand Down Expand Up @@ -359,7 +347,7 @@ PVOID sendAudioPackets(PVOID args)
}

CleanUp:

printf("[KVS Master] closing audio thread");
return (PVOID) (ULONG_PTR) retStatus;
}

Expand Down
48 changes: 38 additions & 10 deletions src/source/Ice/IceAgent.c
Original file line number Diff line number Diff line change
Expand Up @@ -1229,10 +1229,20 @@ STATUS iceCandidatePairCheckConnection(PStunPacket pStunBindingRequest, PIceAgen
UINT32 checkSum = 0;

CHK(pStunBindingRequest != NULL && pIceAgent != NULL && pIceCandidatePair != NULL, STATUS_NULL_ARG);
CHK(pIceCandidatePair->local != NULL && pIceCandidatePair->remote != NULL, STATUS_NULL_ARG);
CHK_STATUS(getStunAttribute(pStunBindingRequest, STUN_ATTRIBUTE_TYPE_PRIORITY, (PStunAttributeHeader*) &pStunAttributePriority));

CHK(pStunAttributePriority != NULL, STATUS_INVALID_ARG);

if (pIceCandidatePair->local->ipAddress.family == KVS_IP_FAMILY_TYPE_IPV4) {
DLOGD("remote ip:%u.%u.%u.%u, port:%u, local ip:%u.%u.%u.%u, port:%u", pIceCandidatePair->remote->ipAddress.address[0],
pIceCandidatePair->remote->ipAddress.address[1], pIceCandidatePair->remote->ipAddress.address[2],
pIceCandidatePair->remote->ipAddress.address[3], pIceCandidatePair->remote->ipAddress.address[0],
pIceCandidatePair->remote->ipAddress.port, pIceCandidatePair->local->ipAddress.address[1],
pIceCandidatePair->local->ipAddress.address[2], pIceCandidatePair->local->ipAddress.address[3],
pIceCandidatePair->local->ipAddress.address[0], pIceCandidatePair->local->ipAddress.port);
}

// update priority and transaction id
pStunAttributePriority->priority = pIceCandidatePair->local->priority;
CHK_STATUS(iceUtilsGenerateTransactionId(pStunBindingRequest->header.transactionId, ARRAY_SIZE(pStunBindingRequest->header.transactionId)));
Expand Down Expand Up @@ -2247,11 +2257,12 @@ STATUS incomingRelayedDataHandler(UINT64 customData, PSocketConnection pSocketCo
STATUS retStatus = STATUS_SUCCESS;
PIceCandidate pRelayedCandidate = (PIceCandidate) customData;
// this should be more than enough. Usually the number of channel data in each tcp message is around 4
TurnChannelData turnChannelData[DEFAULT_TURN_CHANNEL_DATA_BUFFER_SIZE];
TurnChannelData turnChannelData[DEFAULT_TURN_CHANNEL_DATA_BUFFER_SIZE] = {0};
UINT32 turnChannelDataCount = ARRAY_SIZE(turnChannelData), i = 0;

CHK(pRelayedCandidate != NULL && pSocketConnection != NULL, STATUS_NULL_ARG);

DLOGV("Candidate id: %s", pRelayedCandidate->id);
CHK_STATUS(turnConnectionIncomingDataHandler(pRelayedCandidate->pTurnConnection, pBuffer, bufferLen, pSrc, pDest, turnChannelData,
&turnChannelDataCount));
for (i = 0; i < turnChannelDataCount; ++i) {
Expand Down Expand Up @@ -2282,6 +2293,7 @@ STATUS incomingDataHandler(UINT64 customData, PSocketConnection pSocketConnectio
// for stun packets, first 8 bytes are 4 byte type and length, then 4 byte magic byte
if ((bufferLen < 8 || !IS_STUN_PACKET(pBuffer)) && pIceAgent->iceAgentCallbacks.inboundPacketFn != NULL) {
// release lock early

MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;
pIceAgent->iceAgentCallbacks.inboundPacketFn(pIceAgent->iceAgentCallbacks.customData, pBuffer, bufferLen);
Expand Down Expand Up @@ -2441,7 +2453,9 @@ STATUS handleStunPacket(PIceAgent pIceAgent, PBYTE pBuffer, UINT32 bufferLen, PS
pIceAgent->rtcIceServerDiagnostics[pIceCandidate->iceServerIndex].totalResponsesReceived++;
retStatus = hashTableGet(pIceAgent->requestTimestampDiagnostics, checkSum, &requestSentTime);
if (retStatus != STATUS_SUCCESS) {
DLOGW("Unable to fetch request Timestamp from the hash table. No update to totalRoundTripTime (error code: 0x%08x)", retStatus);
DLOGW("Unable to fetch request Timestamp from the hash table. No update to totalRoundTripTime (error code: 0x%08x), "
"stunBindingRequest",
retStatus);
} else {
pIceAgent->rtcIceServerDiagnostics[pIceCandidate->iceServerIndex].totalRoundTripTime += GETTIME() - requestSentTime;
CHK_STATUS(hashTableRemove(pIceAgent->requestTimestampDiagnostics, checkSum));
Expand Down Expand Up @@ -2469,6 +2483,7 @@ STATUS handleStunPacket(PIceAgent pIceAgent, PBYTE pBuffer, UINT32 bufferLen, PS
"Cannot find candidate pair with local candidate %s and remote candidate %s. Dropping STUN binding success response",
ipAddrStr2, ipAddrStr);
}
DLOGV("Pair binding response! %s %s", pIceCandidatePair->local->id, pIceCandidatePair->remote->id);
retStatus = hashTableGet(pIceCandidatePair->requestSentTime, checkSum, &requestSentTime);
if (retStatus != STATUS_SUCCESS) {
DLOGW("Unable to fetch request Timestamp from the hash table. No update to RTT for the pair (error code: 0x%08x)", retStatus);
Expand All @@ -2485,7 +2500,8 @@ STATUS handleStunPacket(PIceAgent pIceAgent, PBYTE pBuffer, UINT32 bufferLen, PS
pIceAgent->rtcIceServerDiagnostics[pIceCandidatePair->local->iceServerIndex].totalResponsesReceived++;
retStatus = hashTableGet(pIceAgent->requestTimestampDiagnostics, checkSum, &requestSentTime);
if (retStatus != STATUS_SUCCESS) {
DLOGW("Unable to fetch request Timestamp from the hash table. No update to totalRoundTripTime (error code: 0x%08x)", retStatus);
DLOGW("Unable to fetch request Timestamp from the hash table. No update to totalRoundTripTime (error code: 0x%08x), typeRelayed",
retStatus);
} else {
pIceAgent->rtcIceServerDiagnostics[pIceCandidatePair->local->iceServerIndex].totalRoundTripTime += GETTIME() - requestSentTime;
CHK_STATUS(hashTableRemove(pIceAgent->requestTimestampDiagnostics, checkSum));
Expand Down Expand Up @@ -2513,10 +2529,13 @@ STATUS handleStunPacket(PIceAgent pIceAgent, PBYTE pBuffer, UINT32 bufferLen, PS
}

if (pIceCandidatePair->state != ICE_CANDIDATE_PAIR_STATE_SUCCEEDED) {
DLOGV("Pair succeeded! %s %s", pIceCandidatePair->local->id, pIceCandidatePair->remote->id);
pIceCandidatePair->state = ICE_CANDIDATE_PAIR_STATE_SUCCEEDED;
retStatus = hashTableGet(pIceCandidatePair->requestSentTime, checkSum, &requestSentTime);
if (retStatus != STATUS_SUCCESS) {
DLOGW("Unable to fetch request Timestamp from the hash table. No update to totalRoundTripTime (error code: 0x%08x)", retStatus);
DLOGW(
"Unable to fetch request Timestamp from the hash table. No update to totalRoundTripTime (error code: 0x%08x), stateSucceeded",
retStatus);
} else {
pIceCandidatePair->roundTripTime = GETTIME() - requestSentTime;
DLOGD("Ice candidate pair %s_%s is connected. Round trip time: %" PRIu64 "ms", pIceCandidatePair->local->id,
Expand All @@ -2537,12 +2556,21 @@ STATUS handleStunPacket(PIceAgent pIceAgent, PBYTE pBuffer, UINT32 bufferLen, PS
break;

default:
CHK_STATUS(hexEncode(pBuffer, bufferLen, NULL, &hexStrLen));
hexStr = MEMCALLOC(1, hexStrLen * SIZEOF(CHAR));
CHK(hexStr != NULL, STATUS_NOT_ENOUGH_MEMORY);
CHK_STATUS(hexEncode(pBuffer, bufferLen, hexStr, &hexStrLen));
DLOGW("Dropping unrecognized STUN packet. Packet type: 0x%02x. Packet content: \n\t%s", stunPacketType, hexStr);
SAFE_MEMFREE(hexStr);
if (!IS_STUN_PACKET(pBuffer)) {
CHK_STATUS(hexEncode(pBuffer, bufferLen, NULL, &hexStrLen));
hexStr = MEMCALLOC(1, hexStrLen * SIZEOF(CHAR));
CHK(hexStr != NULL, STATUS_NOT_ENOUGH_MEMORY);
CHK_STATUS(hexEncode(pBuffer, bufferLen, hexStr, &hexStrLen));
DLOGW("Dropping unrecognized STUN packet. Packet type: 0x%02x. Packet content: \n\t%s", stunPacketType, hexStr);
SAFE_MEMFREE(hexStr);
} else if (STUN_PACKET_IS_TYPE_ERROR(pBuffer)) {
CHK_STATUS(hexEncode(pBuffer, bufferLen, NULL, &hexStrLen));
hexStr = MEMCALLOC(1, hexStrLen * SIZEOF(CHAR));
CHK(hexStr != NULL, STATUS_NOT_ENOUGH_MEMORY);
CHK_STATUS(hexEncode(pBuffer, bufferLen, hexStr, &hexStrLen));
DLOGW("Error STUN packet. Packet type: 0x%02x. Packet content: \n\t%s", stunPacketType, hexStr);
SAFE_MEMFREE(hexStr);
}
break;
}

Expand Down
1 change: 1 addition & 0 deletions src/source/Ice/IceAgentStateMachine.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ STATUS fromCheckConnectionIceAgentState(UINT64 customData, PUINT64 pState)
CHK_STATUS(doubleListGetHeadNode(pIceAgent->iceCandidatePairs, &pCurNode));
while (pCurNode != NULL && !connectedCandidatePairFound) {
pIceCandidatePair = (PIceCandidatePair) pCurNode->data;
DLOGD("Checking pair: %s %s, state: %d", pIceCandidatePair->local->id, pIceCandidatePair->remote->id, pIceCandidatePair->state);
pCurNode = pCurNode->pNext;

if (pIceCandidatePair->state == ICE_CANDIDATE_PAIR_STATE_SUCCEEDED) {
Expand Down
15 changes: 14 additions & 1 deletion src/source/Ice/TurnConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ STATUS turnConnectionHandleStunError(PTurnConnection pTurnConnection, PBYTE pBuf
MUTEX_LOCK(pTurnConnection->lock);
locked = TRUE;

stunPacketType = (UINT16) getInt16(*((PUINT16) pBuffer));
/* we could get errors like expired nonce after sending the deallocation packet. The allocate would have been
* deallocated even if the response is error response, and if we try to deallocate again we would get invalid
* allocation error. Therefore if we get an error after we've sent the deallocation packet, consider the
Expand Down Expand Up @@ -450,9 +451,10 @@ STATUS turnConnectionHandleChannelData(PTurnConnection pTurnConnection, PBYTE pB
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;

UINT32 turnChannelDataCount = 0;
UINT32 turnChannelDataCount = 0, hexStrLen = 0;
UINT16 channelNumber = 0;
PTurnPeer pTurnPeer = NULL;
PCHAR hexStr = NULL;

CHK(pTurnConnection != NULL && pChannelData != NULL && pChannelDataCount != NULL && pProcessedDataLen != NULL, STATUS_NULL_ARG);
CHK(pBuffer != NULL && bufferLen > 0, STATUS_INVALID_ARG);
Expand All @@ -478,7 +480,14 @@ STATUS turnConnectionHandleChannelData(PTurnConnection pTurnConnection, PBYTE pB
}

} else {
CHK_STATUS(hexEncode(pBuffer, bufferLen, NULL, &hexStrLen));
hexStr = MEMCALLOC(1, hexStrLen * SIZEOF(CHAR));
CHK(hexStr != NULL, STATUS_NOT_ENOUGH_MEMORY);
CHK_STATUS(hexEncode(pBuffer, bufferLen, hexStr, &hexStrLen));
DLOGE("Turn connection does not have channel number, dumping payload: %s", hexStr);
turnChannelDataCount = 0;

SAFE_MEMFREE(hexStr);
}
*pProcessedDataLen = bufferLen;

Expand All @@ -493,6 +502,9 @@ STATUS turnConnectionHandleChannelData(PTurnConnection pTurnConnection, PBYTE pB

CHK_LOG_ERR(retStatus);

if (hexStr != NULL) {
SAFE_MEMFREE(hexStr);
}
if (locked) {
MUTEX_UNLOCK(pTurnConnection->lock);
}
Expand Down Expand Up @@ -524,6 +536,7 @@ STATUS turnConnectionHandleChannelDataTcpMode(PTurnConnection pTurnConnection, P
/* process only one channel data and return. Because channel data can be intermixed with STUN packet.
* need to check remainingBufLen too because channel data could be incomplete. */
while (remainingBufLen != 0 && channelDataCount == 0) {
DLOGV("currRecvDataLen: %d", pTurnConnection->currRecvDataLen);
if (pTurnConnection->currRecvDataLen != 0) {
if (pTurnConnection->currRecvDataLen >= TURN_DATA_CHANNEL_SEND_OVERHEAD) {
/* pTurnConnection->recvDataBuffer always has channel data start */
Expand Down