Skip to content

Commit

Permalink
server: Add suspending sessions that did not pass the pHash verificat…
Browse files Browse the repository at this point in the history
…ion (#2103)

server: Add suspending sessions that did not pass the pHash verification
  • Loading branch information
leszko authored Nov 22, 2021
1 parent 2f6c226 commit 02460b9
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
- \#2085 Set max refresh sessions threshold to 8 (@yondonfu)
- \#2083 Return 422 to the push client after max retry attempts for a segment (@jailuthra)
- \#2022 Randomize selection of orchestrators in untrusted pool at a random frequency (@yondonfu)
- \#2100 Check verified session first while choosing the result from multiple untrusted sessions (@leszko)
- \#2100 Check verified session first while choosing the result from multiple untrusted sessions (@leszko)
- \#2103 Suspend sessions that did not pass p-hash verification (@leszko)

#### Orchestrator

Expand Down
12 changes: 10 additions & 2 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func (bsm *BroadcastSessionsManager) chooseResults(submitResultsCh chan *SubmitR
}

// verify untrusted hashes
var sessionsToSuspend []*BroadcastSession
for _, untrustedResult := range untrustedResults {
untrustedHash, err := drivers.GetSegmentData(untrustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl)
if err != nil {
Expand All @@ -518,9 +519,16 @@ func (bsm *BroadcastSessionsManager) chooseResults(submitResultsCh chan *SubmitR
if untrustedResult.Err == nil {
bsm.sessionVerified(untrustedResult.Session)
}
// suspend sessions which returned incorrect results
for _, s := range sessionsToSuspend {
bsm.suspendAndRemoveOrch(s)
}
return untrustedResult.Session, untrustedResult.TranscodeResult, untrustedResult.Err
} else if monitor.Enabled {
monitor.FastVerificationFailed()
} else {
sessionsToSuspend = append(sessionsToSuspend, untrustedResult.Session)
if monitor.Enabled {
monitor.FastVerificationFailed()
}
}
}

Expand Down
82 changes: 80 additions & 2 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,7 @@ func TestPush_ReuseIntmidWithDiffExtmid(t *testing.T) {
assert.False(extEx2)
serverCleanup(s)
}

func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -1590,7 +1591,31 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("UNtrusted transcoded binary data"))
})
unverifiedHash := goodHash
unverifiedHashCalled := 0
mux2.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(unverifiedHash)
unverifiedHashCalled++
})

ts3, mux3 := stubTLSServer()
defer ts3.Close()
tSegData3 := []*net.TranscodedSegmentData{{Url: ts3.URL + segPath, Pixels: 100, PerceptualHashUrl: ts3.URL + segPath + ".phash"}}
tr3 := dummyRes(tSegData3)
buf3, err := proto.Marshal(tr3)
require.Nil(t, err)
mux3.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
// delay so it will be chosen second
time.Sleep(50 * time.Millisecond)
w.WriteHeader(http.StatusOK)
w.Write(buf3)
})
mux3.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("second UNtrusted transcoded binary data"))
})
mux3.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(goodHash)
})
Expand All @@ -1604,14 +1629,23 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
sess2.Params.ManifestID = "mani"
sess2.OrchestratorScore = common.Score_Untrusted

bsm := bsmWithSessListExt([]*BroadcastSession{sess1}, []*BroadcastSession{sess2}, false)
sess3 := StubBroadcastSession(ts3.URL)
sess3.Params.Profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
sess3.Params.ManifestID = "mani"
sess3.OrchestratorScore = common.Score_Untrusted

bsm := bsmWithSessListExt([]*BroadcastSession{sess1}, []*BroadcastSession{sess3, sess2}, false)
bsm.VerificationFreq = 1
assert.Equal(0, bsm.untrustedPool.sus.count)
// hack: stop pool from refreshing
bsm.untrustedPool.refreshing = true

url, _ := url.ParseRequestURI("test://some.host")
osd := drivers.NewMemoryDriver(url)
osSession := osd.NewSession("testPath")
sess1.BroadcasterOS = osSession
sess2.BroadcasterOS = osSession
sess3.BroadcasterOS = osSession

oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
Expand Down Expand Up @@ -1652,7 +1686,7 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert.Nil(err)
assert.Contains(params, "name")
assert.Len(params, 1)
assert.Equal(params["name"], "P144p25fps16x9_17.ts")
assert.Equal("P144p25fps16x9_17.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_17.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
Expand All @@ -1665,4 +1699,48 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert.Equal(1, i)
assert.Equal(uint64(12), cxn.sourceBytes)
assert.Equal(uint64(32), cxn.transcodedBytes)

// now make unverified to respond with bad hash
unverifiedHash = []byte{0}
reader = strings.NewReader("InsteadOf.TS")
w = httptest.NewRecorder()
req = httptest.NewRequest("POST", "/live/mani/18.ts", reader)

req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
resp = w.Result()
defer resp.Body.Close()
assert.Equal(200, resp.StatusCode)

mediaType, params, err = mime.ParseMediaType(resp.Header.Get("Content-Type"))
assert.Equal("multipart/mixed", mediaType)
assert.Nil(err)
mr = multipart.NewReader(resp.Body, params["boundary"])
i = 0
for {
p, err := mr.NextPart()
if err == io.EOF {
break
}
assert.NoError(err)
mediaType, params, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
assert.Nil(err)
assert.Contains(params, "name")
assert.Len(params, 1)
assert.Equal("P144p25fps16x9_18.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_18.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
assert.NoError(err)
assert.Equal("video/mp2t", strings.ToLower(mediaType))
assert.Equal("second UNtrusted transcoded binary data", string(bodyPart))

i++
}
assert.Equal(1, i)
assert.Equal(uint64(12*2), cxn.sourceBytes)
assert.Equal(uint64(71), cxn.transcodedBytes)
assert.Equal(2, unverifiedHashCalled)
assert.Contains(bsm.untrustedPool.sus.list, ts2.URL)
assert.Equal(0, bsm.untrustedPool.sus.count)
}

0 comments on commit 02460b9

Please sign in to comment.