From 707115e481c487d806cba93769c6b3fe8b73a92f Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 13 Feb 2023 00:05:29 -0500 Subject: [PATCH] Added test for stream failover across transcoders --- core/orch_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/core/orch_test.go b/core/orch_test.go index 23453dcb2c..a566ffe5b3 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -419,6 +419,63 @@ func TestRemoveFromRemoteTranscoders(t *testing.T) { assert.Equal(remoteTranscoderList, emptyTList) } +//Validates that selectTranscoder will shift all streams to another valid transcoder if transcoder fails. +func TestRemoveFromRemoteTranscoders_StreamFailover(t *testing.T) { + rtm := NewRemoteTranscoderManager() + assert := assert.New(t) + capabilities := NewCapabilities(append(DefaultCapabilities(), OptionalCapabilities()...), []Capability{}) + + var transcoder_1 = &StubTranscoderServer{manager: rtm} + var transcoder_2 = &StubTranscoderServer{manager: rtm} + + assert.Nil(rtm.liveTranscoders[transcoder_1]) + assert.Empty(rtm.remoteTranscoders) + + // register 2 transcoders, 8 stream capacity each + go func() { rtm.Manage(transcoder_1, 8, capabilities.ToNetCapabilities()) }() + time.Sleep(1 * time.Millisecond) // allow time for first stream to register + go func() { rtm.Manage(transcoder_2, 8, capabilities.ToNetCapabilities()) }() + time.Sleep(1 * time.Millisecond) // allow time for second stream to register + + //Validate two transcoders exist + assert.NotNil(rtm.liveTranscoders[transcoder_1]) + assert.NotNil(rtm.liveTranscoders[transcoder_2]) + assert.Len(rtm.remoteTranscoders, 2) + + //Load up streams on transcoders + for i := 0; i < 8; i++ { + testSessionId := fmt.Sprintf("%s%d", "testID", i) + rtm.selectTranscoder(testSessionId, nil) + time.Sleep(1 * time.Millisecond) // allow time for second stream to register + } + + t1 := rtm.liveTranscoders[transcoder_1] + t2 := rtm.liveTranscoders[transcoder_2] + + //Validate 4 streams per transcoder + assert.Equal(4, t1.load) + assert.Equal(4, t2.load) + + //Break connection to transcoder by deleting live transcoder map entry + rtm.RTmutex.Lock() + delete(rtm.liveTranscoders, t1.stream) + rtm.RTmutex.Unlock() + + //Repeat selectTranscoder for existing session ids - will remove transcoder and shift streams to another transcoder + for i := 0; i < 8; i++ { + testSessionId := fmt.Sprintf("%s%d", "testID", i) + rtm.selectTranscoder(testSessionId, nil) + time.Sleep(1 * time.Millisecond) // allow time for second stream to register + } + + t1 = rtm.liveTranscoders[transcoder_1] + t2 = rtm.liveTranscoders[transcoder_2] + + //Validate all 8 streams on opposite transcoder + assert.Empty(t1) + assert.Equal(8, t2.load) +} + func TestTranscoderManagerTranscoding(t *testing.T) { m := NewRemoteTranscoderManager() s := &StubTranscoderServer{manager: m}