Skip to content

Commit

Permalink
No more detaching threads in tests (#1889)
Browse files Browse the repository at this point in the history
* No more detaching threads in tests

* Missed joining in one test

* lock around adding thread to vector

* terminate adding threads flag

* reset bool in tests

* Change on Ice handler after done with test to stop accessing out of scope variables after scope is closed

* unused params
  • Loading branch information
jdelapla authored Dec 18, 2023
1 parent b82923c commit 03c85cb
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 45 deletions.
104 changes: 79 additions & 25 deletions tst/PeerConnectionFunctionalityTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,43 @@ TEST_F(PeerConnectionFunctionalityTest, connectTwoPeersWithDelay)
RtcSessionDescriptionInit sdp;
SIZE_T connectedCount = 0;
PRtcPeerConnection offerPc = NULL, answerPc = NULL;
PeerContainer offer;
PeerContainer answer;

MEMSET(&configuration, 0x00, SIZEOF(RtcConfiguration));

EXPECT_EQ(createPeerConnection(&configuration, &offerPc), STATUS_SUCCESS);
EXPECT_EQ(createPeerConnection(&configuration, &answerPc), STATUS_SUCCESS);

auto onICECandidateHdlr = [](UINT64 customData, PCHAR candidateStr) -> void {
PPeerContainer container = (PPeerContainer)customData;
if (candidateStr != NULL) {
std::thread(
[customData](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) customData, iceCandidate.candidate));
},
std::string(candidateStr))
.detach();
container->client->lock.lock();
if(!container->client->noNewThreads) {
container->client->threads.push_back(std::thread(
[container](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) container->pc, iceCandidate.candidate));
},
std::string(candidateStr)));
}
container->client->lock.unlock();
}
};

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) answerPc, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) offerPc, onICECandidateHdlr));
offer.pc = offerPc;
offer.client = this;
answer.pc = answerPc;
answer.client = this;

auto onICECandidateHdlrDone = [](UINT64 customData, PCHAR candidateStr) -> void {
UNUSED_PARAM(customData);
UNUSED_PARAM(candidateStr);
};

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) &answer, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) &offer, onICECandidateHdlr));

auto onICEConnectionStateChangeHdlr = [](UINT64 customData, RTC_PEER_CONNECTION_STATE newState) -> void {
if (newState == RTC_PEER_CONNECTION_STATE_CONNECTED) {
Expand Down Expand Up @@ -108,6 +124,17 @@ TEST_F(PeerConnectionFunctionalityTest, connectTwoPeersWithDelay)

EXPECT_EQ(2, connectedCount);

this->lock.lock();
//join all threads before leaving
for (auto& th : this->threads) th.join();

this->threads.clear();
this->noNewThreads = TRUE;
this->lock.unlock();

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) 0, onICECandidateHdlrDone));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) 0, onICECandidateHdlrDone));

closePeerConnection(offerPc);
closePeerConnection(answerPc);

Expand Down Expand Up @@ -635,6 +662,9 @@ TEST_F(PeerConnectionFunctionalityTest, noLostFramesAfterConnected)
ATOMIC_BOOL seenFirstFrame = FALSE;
Frame videoFrame;

PeerContainer offer;
PeerContainer answer;

MEMSET(&configuration, 0x00, SIZEOF(RtcConfiguration));
MEMSET(&videoFrame, 0x00, SIZEOF(Frame));

Expand All @@ -654,6 +684,33 @@ TEST_F(PeerConnectionFunctionalityTest, noLostFramesAfterConnected)
addTrackToPeerConnection(offerPc, &offerVideoTrack, &offerVideoTransceiver, RTC_CODEC_VP8, MEDIA_STREAM_TRACK_KIND_VIDEO);
addTrackToPeerConnection(answerPc, &answerVideoTrack, &answerVideoTransceiver, RTC_CODEC_VP8, MEDIA_STREAM_TRACK_KIND_VIDEO);

auto onICECandidateHdlr = [](UINT64 customData, PCHAR candidateStr) -> void {
PPeerContainer container = (PPeerContainer)customData;
if (candidateStr != NULL) {
container->client->lock.lock();
if(!container->client->noNewThreads) {
container->client->threads.push_back(std::thread(
[container](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) container->pc, iceCandidate.candidate));
},
std::string(candidateStr)));
}
container->client->lock.unlock();
}
};

offer.pc = offerPc;
offer.client = this;
answer.pc = answerPc;
answer.client = this;

auto onICECandidateHdlrDone = [](UINT64 customData, PCHAR candidateStr) -> void {
UNUSED_PARAM(customData);
UNUSED_PARAM(candidateStr);
};

auto onFrameHandler = [](UINT64 customData, PFrame pFrame) -> void {
UNUSED_PARAM(pFrame);
if (pFrame->frameData[0] == 1) {
Expand All @@ -662,21 +719,8 @@ TEST_F(PeerConnectionFunctionalityTest, noLostFramesAfterConnected)
};
EXPECT_EQ(transceiverOnFrame(answerVideoTransceiver, (UINT64) &seenFirstFrame, onFrameHandler), STATUS_SUCCESS);

auto onICECandidateHdlr = [](UINT64 customData, PCHAR candidateStr) -> void {
if (candidateStr != NULL) {
std::thread(
[customData](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) customData, iceCandidate.candidate));
},
std::string(candidateStr))
.detach();
}
};

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) answerPc, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) offerPc, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) &answer, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) &offer, onICECandidateHdlr));

auto onICEConnectionStateChangeHdlr = [](UINT64 customData, RTC_PEER_CONNECTION_STATE newState) -> void {
Context* pContext = (Context*) customData;
Expand Down Expand Up @@ -714,6 +758,16 @@ TEST_F(PeerConnectionFunctionalityTest, noLostFramesAfterConnected)
THREAD_SLEEP(HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}

this->lock.lock();
for (auto& th : this->threads) th.join();

this->threads.clear();
this->noNewThreads = TRUE;
this->lock.unlock();

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) 0, onICECandidateHdlrDone));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) 0, onICECandidateHdlrDone));

MEMFREE(videoFrame.frameData);
closePeerConnection(offerPc);
closePeerConnection(answerPc);
Expand Down
99 changes: 79 additions & 20 deletions tst/WebRTCClientTestFixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ void WebRtcClientTestBase::SetUp()
mDroppedFrameIndex = 0;
mExpectedFrameCount = 0;
mExpectedDroppedFrameCount = 0;
noNewThreads = FALSE;

SET_INSTRUMENTED_ALLOCATORS();

Expand Down Expand Up @@ -218,22 +219,40 @@ bool WebRtcClientTestBase::connectTwoPeers(PRtcPeerConnection offerPc, PRtcPeerC
PCHAR pAnswerCertFingerprint)
{
RtcSessionDescriptionInit sdp;
PeerContainer offer;
PeerContainer answer;
this->noNewThreads = FALSE;

auto onICECandidateHdlr = [](UINT64 customData, PCHAR candidateStr) -> void {
PPeerContainer container = (PPeerContainer)customData;
if (candidateStr != NULL) {
std::thread(
[customData](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) customData, iceCandidate.candidate));
},
std::string(candidateStr))
.detach();
container->client->lock.lock();
if(!container->client->noNewThreads) {
container->client->threads.push_back(std::thread(
[container](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) container->pc, iceCandidate.candidate));
},
std::string(candidateStr)));
}
container->client->lock.unlock();
}

};

auto onICECandidateHdlrDone = [](UINT64 customData, PCHAR candidateStr) -> void {
UNUSED_PARAM(customData);
UNUSED_PARAM(candidateStr);
};

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) answerPc, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) offerPc, onICECandidateHdlr));
offer.pc = offerPc;
offer.client = this;
answer.pc = answerPc;
answer.client = this;

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) &answer, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) &offer, onICECandidateHdlr));

auto onICEConnectionStateChangeHdlr = [](UINT64 customData, RTC_PEER_CONNECTION_STATE newState) -> void {
ATOMIC_INCREMENT((PSIZE_T) customData + newState);
Expand Down Expand Up @@ -263,29 +282,58 @@ bool WebRtcClientTestBase::connectTwoPeers(PRtcPeerConnection offerPc, PRtcPeerC
THREAD_SLEEP(HUNDREDS_OF_NANOS_IN_A_SECOND);
}

this->lock.lock();
//join all threads before leaving
for (auto& th : this->threads) th.join();

this->threads.clear();
this->noNewThreads = TRUE;
this->lock.unlock();

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) 0, onICECandidateHdlrDone));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) 0, onICECandidateHdlrDone));


return ATOMIC_LOAD(&this->stateChangeCount[RTC_PEER_CONNECTION_STATE_CONNECTED]) == 2;
}

bool WebRtcClientTestBase::connectTwoPeersAsyncIce(PRtcPeerConnection offerPc, PRtcPeerConnection answerPc, PCHAR pOfferCertFingerprint,
PCHAR pAnswerCertFingerprint)
{
RtcSessionDescriptionInit sdp;
PeerContainer offer;
PeerContainer answer;
this->noNewThreads = FALSE;

auto onICECandidateHdlr = [](UINT64 customData, PCHAR candidateStr) -> void {
PPeerContainer container = (PPeerContainer)customData;
if (candidateStr != NULL) {
std::thread(
[customData](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) customData, iceCandidate.candidate));
},
std::string(candidateStr))
.detach();
container->client->lock.lock();
if(!container->client->noNewThreads) {
container->client->threads.push_back(std::thread(
[container](std::string candidate) {
RtcIceCandidateInit iceCandidate;
EXPECT_EQ(STATUS_SUCCESS, deserializeRtcIceCandidateInit((PCHAR) candidate.c_str(), STRLEN(candidate.c_str()), &iceCandidate));
EXPECT_EQ(STATUS_SUCCESS, addIceCandidate((PRtcPeerConnection) container->pc, iceCandidate.candidate));
},
std::string(candidateStr)));
}
container->client->lock.unlock();
}
};

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) answerPc, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) offerPc, onICECandidateHdlr));
auto onICECandidateHdlrDone = [](UINT64 customData, PCHAR candidateStr) -> void {
UNUSED_PARAM(customData);
UNUSED_PARAM(candidateStr);
};

offer.pc = offerPc;
offer.client = this;
answer.pc = answerPc;
answer.client = this;

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) &answer, onICECandidateHdlr));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) &offer, onICECandidateHdlr));

auto onICEConnectionStateChangeHdlr = [](UINT64 customData, RTC_PEER_CONNECTION_STATE newState) -> void {
ATOMIC_INCREMENT((PSIZE_T) customData + newState);
Expand Down Expand Up @@ -317,6 +365,17 @@ bool WebRtcClientTestBase::connectTwoPeersAsyncIce(PRtcPeerConnection offerPc, P
THREAD_SLEEP(HUNDREDS_OF_NANOS_IN_A_SECOND);
}

this->lock.lock();
//join all threads before leaving
for (auto& th : this->threads) th.join();

this->threads.clear();
this->noNewThreads = TRUE;
this->lock.unlock();

EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(offerPc, (UINT64) 0, onICECandidateHdlrDone));
EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnIceCandidate(answerPc, (UINT64) 0, onICECandidateHdlrDone));

return ATOMIC_LOAD(&this->stateChangeCount[RTC_PEER_CONNECTION_STATE_CONNECTED]) == 2;
}

Expand Down
8 changes: 8 additions & 0 deletions tst/WebRTCClientTestFixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class WebRtcClientTestBase : public ::testing::Test {
UINT32 mRtpPacketCount;
UINT32 mUriCount = 0;
SIGNALING_CLIENT_HANDLE mSignalingClientHandle;
std::vector<std::thread> threads;
std::mutex lock;
BOOL noNewThreads = FALSE;

WebRtcClientTestBase();

Expand Down Expand Up @@ -339,6 +342,11 @@ class WebRtcClientTestBase : public ::testing::Test {
Tag mTags[3];
};

typedef struct {
PRtcPeerConnection pc;
WebRtcClientTestBase* client;
} PeerContainer, *PPeerContainer;

} // namespace webrtcclient
} // namespace video
} // namespace kinesis
Expand Down

0 comments on commit 03c85cb

Please sign in to comment.