From 5740163c1fa9ccd7dee5dfd3d619922e9dd3c106 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 6 Feb 2023 23:57:56 -0500 Subject: [PATCH 1/8] Fixed a transcoding bug that occured when remote transcoder was removed --- core/orchestrator.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/core/orchestrator.go b/core/orchestrator.go index 73c59930f9..29dc9f4c91 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -917,21 +917,10 @@ func removeFromRemoteTranscoders(rt *RemoteTranscoder, remoteTranscoders []*Remo return remoteTranscoders } - lastIndex := len(remoteTranscoders) - 1 - last := remoteTranscoders[lastIndex] - if rt == last { - return remoteTranscoders[:lastIndex] - } - newRemoteTs := make([]*RemoteTranscoder, 0) - for i, t := range remoteTranscoders { - if t == rt { - if i == 0 { - return remoteTranscoders[1:] - } - newRemoteTs = remoteTranscoders[:i] - newRemoteTs = append(newRemoteTs, remoteTranscoders[i+1:]...) - break + for _, t := range remoteTranscoders { + if t != rt { + newRemoteTs = append(newRemoteTs, t) } } return newRemoteTs From b8d59df0f38fb80846d393fbad9749fcd1c1c990 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 7 Feb 2023 00:48:25 -0500 Subject: [PATCH 2/8] Add changelog --- CHANGELOG_PENDING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 48a32347b0..008b4dbe45 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -15,13 +15,13 @@ #### Transcoder ### Bug Fixes 🐞 +- #2746 Fixed a transcoding bug that occured when remote transcoder was removed #### CLI #### General #### Broadcaster -- \#2709 Add logging for high keyframe interval, reduce log level for discovery loop - \#2684 Fix transcode success rate metric #### Orchestrator From 714794cc9437e3f6fca7805473f531603ff300d2 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 7 Feb 2023 01:05:05 -0500 Subject: [PATCH 3/8] Update changlog --- CHANGELOG_PENDING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 008b4dbe45..1da0240532 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -15,7 +15,7 @@ #### Transcoder ### Bug Fixes 🐞 -- #2746 Fixed a transcoding bug that occured when remote transcoder was removed +- #2746 Fixed a transcoding bug that occurred when remote transcoder was removed #### CLI From 1bb912e38a2c83e033df30e9ad857e4650e278dc Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 7 Feb 2023 01:05:05 -0500 Subject: [PATCH 4/8] Updated changelog, fixed comment --- core/orchestrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/orchestrator.go b/core/orchestrator.go index 29dc9f4c91..a44fa49c7f 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -913,7 +913,7 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco func removeFromRemoteTranscoders(rt *RemoteTranscoder, remoteTranscoders []*RemoteTranscoder) []*RemoteTranscoder { if len(remoteTranscoders) == 0 { - // No transocerds to remove, return + // No transcoders to remove, return return remoteTranscoders } From 122f57222136cc08b0ae7eabdcbce92fafe21820 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 7 Feb 2023 01:05:05 -0500 Subject: [PATCH 5/8] Updated changelog, fixed comment --- CHANGELOG_PENDING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1da0240532..6ecce5f5e6 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -15,7 +15,7 @@ #### Transcoder ### Bug Fixes 🐞 -- #2746 Fixed a transcoding bug that occurred when remote transcoder was removed +- #2747 Fixed a transcoding bug that occurred when remote transcoder was removed #### CLI From 707115e481c487d806cba93769c6b3fe8b73a92f Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 13 Feb 2023 00:05:29 -0500 Subject: [PATCH 6/8] Added test for stream failover across transcoders --- core/orch_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/core/orch_test.go b/core/orch_test.go index 23453dcb2c..a566ffe5b3 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -419,6 +419,63 @@ func TestRemoveFromRemoteTranscoders(t *testing.T) { assert.Equal(remoteTranscoderList, emptyTList) } +//Validates that selectTranscoder will shift all streams to another valid transcoder if transcoder fails. +func TestRemoveFromRemoteTranscoders_StreamFailover(t *testing.T) { + rtm := NewRemoteTranscoderManager() + assert := assert.New(t) + capabilities := NewCapabilities(append(DefaultCapabilities(), OptionalCapabilities()...), []Capability{}) + + var transcoder_1 = &StubTranscoderServer{manager: rtm} + var transcoder_2 = &StubTranscoderServer{manager: rtm} + + assert.Nil(rtm.liveTranscoders[transcoder_1]) + assert.Empty(rtm.remoteTranscoders) + + // register 2 transcoders, 8 stream capacity each + go func() { rtm.Manage(transcoder_1, 8, capabilities.ToNetCapabilities()) }() + time.Sleep(1 * time.Millisecond) // allow time for first stream to register + go func() { rtm.Manage(transcoder_2, 8, capabilities.ToNetCapabilities()) }() + time.Sleep(1 * time.Millisecond) // allow time for second stream to register + + //Validate two transcoders exist + assert.NotNil(rtm.liveTranscoders[transcoder_1]) + assert.NotNil(rtm.liveTranscoders[transcoder_2]) + assert.Len(rtm.remoteTranscoders, 2) + + //Load up streams on transcoders + for i := 0; i < 8; i++ { + testSessionId := fmt.Sprintf("%s%d", "testID", i) + rtm.selectTranscoder(testSessionId, nil) + time.Sleep(1 * time.Millisecond) // allow time for second stream to register + } + + t1 := rtm.liveTranscoders[transcoder_1] + t2 := rtm.liveTranscoders[transcoder_2] + + //Validate 4 streams per transcoder + assert.Equal(4, t1.load) + assert.Equal(4, t2.load) + + //Break connection to transcoder by deleting live transcoder map entry + rtm.RTmutex.Lock() + delete(rtm.liveTranscoders, t1.stream) + rtm.RTmutex.Unlock() + + //Repeat selectTranscoder for existing session ids - will remove transcoder and shift streams to another transcoder + for i := 0; i < 8; i++ { + testSessionId := fmt.Sprintf("%s%d", "testID", i) + rtm.selectTranscoder(testSessionId, nil) + time.Sleep(1 * time.Millisecond) // allow time for second stream to register + } + + t1 = rtm.liveTranscoders[transcoder_1] + t2 = rtm.liveTranscoders[transcoder_2] + + //Validate all 8 streams on opposite transcoder + assert.Empty(t1) + assert.Equal(8, t2.load) +} + func TestTranscoderManagerTranscoding(t *testing.T) { m := NewRemoteTranscoderManager() s := &StubTranscoderServer{manager: m} From 641026a52a397a168a9a9c4ad8c3eefcb7a8da76 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 13 Feb 2023 03:40:17 -0500 Subject: [PATCH 7/8] Removed unnecessary time.sleep, added missing assertion, updated comment --- core/orch_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/orch_test.go b/core/orch_test.go index a566ffe5b3..66e6c92259 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -429,13 +429,14 @@ func TestRemoveFromRemoteTranscoders_StreamFailover(t *testing.T) { var transcoder_2 = &StubTranscoderServer{manager: rtm} assert.Nil(rtm.liveTranscoders[transcoder_1]) + assert.Nil(rtm.liveTranscoders[transcoder_2]) assert.Empty(rtm.remoteTranscoders) // register 2 transcoders, 8 stream capacity each go func() { rtm.Manage(transcoder_1, 8, capabilities.ToNetCapabilities()) }() - time.Sleep(1 * time.Millisecond) // allow time for first stream to register + time.Sleep(1 * time.Millisecond) // allow time for remote T to connect go func() { rtm.Manage(transcoder_2, 8, capabilities.ToNetCapabilities()) }() - time.Sleep(1 * time.Millisecond) // allow time for second stream to register + time.Sleep(1 * time.Millisecond) // allow time for remote T to connect //Validate two transcoders exist assert.NotNil(rtm.liveTranscoders[transcoder_1]) @@ -446,7 +447,6 @@ func TestRemoveFromRemoteTranscoders_StreamFailover(t *testing.T) { for i := 0; i < 8; i++ { testSessionId := fmt.Sprintf("%s%d", "testID", i) rtm.selectTranscoder(testSessionId, nil) - time.Sleep(1 * time.Millisecond) // allow time for second stream to register } t1 := rtm.liveTranscoders[transcoder_1] @@ -465,7 +465,6 @@ func TestRemoveFromRemoteTranscoders_StreamFailover(t *testing.T) { for i := 0; i < 8; i++ { testSessionId := fmt.Sprintf("%s%d", "testID", i) rtm.selectTranscoder(testSessionId, nil) - time.Sleep(1 * time.Millisecond) // allow time for second stream to register } t1 = rtm.liveTranscoders[transcoder_1] From bd785c2011d6d71348e020137c8b032676669ffa Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 13 Feb 2023 11:36:19 -0500 Subject: [PATCH 8/8] Removed test --- core/orch_test.go | 56 ----------------------------------------------- 1 file changed, 56 deletions(-) diff --git a/core/orch_test.go b/core/orch_test.go index 66e6c92259..23453dcb2c 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -419,62 +419,6 @@ func TestRemoveFromRemoteTranscoders(t *testing.T) { assert.Equal(remoteTranscoderList, emptyTList) } -//Validates that selectTranscoder will shift all streams to another valid transcoder if transcoder fails. -func TestRemoveFromRemoteTranscoders_StreamFailover(t *testing.T) { - rtm := NewRemoteTranscoderManager() - assert := assert.New(t) - capabilities := NewCapabilities(append(DefaultCapabilities(), OptionalCapabilities()...), []Capability{}) - - var transcoder_1 = &StubTranscoderServer{manager: rtm} - var transcoder_2 = &StubTranscoderServer{manager: rtm} - - assert.Nil(rtm.liveTranscoders[transcoder_1]) - assert.Nil(rtm.liveTranscoders[transcoder_2]) - assert.Empty(rtm.remoteTranscoders) - - // register 2 transcoders, 8 stream capacity each - go func() { rtm.Manage(transcoder_1, 8, capabilities.ToNetCapabilities()) }() - time.Sleep(1 * time.Millisecond) // allow time for remote T to connect - go func() { rtm.Manage(transcoder_2, 8, capabilities.ToNetCapabilities()) }() - time.Sleep(1 * time.Millisecond) // allow time for remote T to connect - - //Validate two transcoders exist - assert.NotNil(rtm.liveTranscoders[transcoder_1]) - assert.NotNil(rtm.liveTranscoders[transcoder_2]) - assert.Len(rtm.remoteTranscoders, 2) - - //Load up streams on transcoders - for i := 0; i < 8; i++ { - testSessionId := fmt.Sprintf("%s%d", "testID", i) - rtm.selectTranscoder(testSessionId, nil) - } - - t1 := rtm.liveTranscoders[transcoder_1] - t2 := rtm.liveTranscoders[transcoder_2] - - //Validate 4 streams per transcoder - assert.Equal(4, t1.load) - assert.Equal(4, t2.load) - - //Break connection to transcoder by deleting live transcoder map entry - rtm.RTmutex.Lock() - delete(rtm.liveTranscoders, t1.stream) - rtm.RTmutex.Unlock() - - //Repeat selectTranscoder for existing session ids - will remove transcoder and shift streams to another transcoder - for i := 0; i < 8; i++ { - testSessionId := fmt.Sprintf("%s%d", "testID", i) - rtm.selectTranscoder(testSessionId, nil) - } - - t1 = rtm.liveTranscoders[transcoder_1] - t2 = rtm.liveTranscoders[transcoder_2] - - //Validate all 8 streams on opposite transcoder - assert.Empty(t1) - assert.Equal(8, t2.load) -} - func TestTranscoderManagerTranscoding(t *testing.T) { m := NewRemoteTranscoderManager() s := &StubTranscoderServer{manager: m}