diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 77612a162..8cecb6373 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -23,6 +23,8 @@ #### CLI - \#2438 Add new menu option to gracefully exit livepeer_cli (@emranemran) +- \#2431 New flag to enable content detection on Nvidia Transcoder: `-detectContent`. When set, Transcoder will initialize Tensorflow runtime on each Nvidia GPU, and will run an additional Detector profile, if requested by the transcoding job. + #### General diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index c67286d65..e7665e15e 100644 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -122,6 +122,7 @@ func parseLivepeerConfig() starter.LivepeerConfig { cfg.Netint = flag.String("netint", *cfg.Netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)") cfg.TestTranscoder = flag.Bool("testTranscoder", *cfg.TestTranscoder, "Test Nvidia GPU transcoding at startup") cfg.SceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.SceneClassificationModelPath, "Path to scene classification model") + cfg.DetectContent = flag.Bool("detectContent", *cfg.DetectContent, "Set to true to enable content type detection") // Onchain: cfg.EthAcctAddr = flag.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address") diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 4806022fe..f635667b4 100644 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -85,6 +85,7 @@ type LivepeerConfig struct { Netint *string TestTranscoder *bool SceneClassificationModelPath *string + DetectContent *bool EthAcctAddr *string EthPassword *string EthKeystorePath *string @@ -146,7 +147,8 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultNvidia := "" defaultNetint := "" defaultTestTranscoder := true - defaultSceneClassificationModelPath := "" + defaultDetectContent := false + defaultSceneClassificationModelPath := "tasmodel.pb" // Onchain: defaultEthAcctAddr := "" @@ -211,6 +213,7 @@ func DefaultLivepeerConfig() LivepeerConfig { Netint: &defaultNetint, TestTranscoder: &defaultTestTranscoder, SceneClassificationModelPath: &defaultSceneClassificationModelPath, + DetectContent: &defaultDetectContent, // Onchain: EthAcctAddr: &defaultEthAcctAddr, @@ -387,18 +390,27 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Fatal(err) return } + } else { + // no capability test was run, assume default capabilities + transcoderCaps = append(transcoderCaps, core.DefaultCapabilities()...) } - // FIXME: Short-term hack to pre-load the detection models on every device - if accel == ffmpeg.Nvidia && *cfg.SceneClassificationModelPath != "" { - detectorProfile := ffmpeg.DSceneAdultSoccer - detectorProfile.ModelPath = *cfg.SceneClassificationModelPath - core.DetectorProfile = &detectorProfile - for _, d := range devices { - tc, err := core.NewNvidiaTranscoderWithDetector(&detectorProfile, d) - if err != nil { - glog.Fatalf("Could not initialize detector") + // initialize Tensorflow runtime on each device to reduce delay when creating new transcoding session + if accel == ffmpeg.Nvidia && *cfg.DetectContent { + if _, err := os.Stat(*cfg.SceneClassificationModelPath); err == nil { + detectorProfile := ffmpeg.DSceneAdultSoccer + detectorProfile.ModelPath = *cfg.SceneClassificationModelPath + core.DetectorProfile = &detectorProfile + for _, d := range devices { + tc, err := core.NewNvidiaTranscoderWithDetector(&detectorProfile, d) + if err != nil { + glog.Fatalf("Could not initialize content detector") + } + defer tc.Stop() } - defer tc.Stop() + // add SceneClassification capability + transcoderCaps = append(transcoderCaps, core.Capability_SceneClassification) + } else { + glog.Fatalf("Content detection is enabled, but the model file '%s' does not exist", *cfg.SceneClassificationModelPath) } } // Initialize LB transcoder @@ -1008,11 +1020,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { // if http addr is not provided, listen to all ifaces // take the port to listen to from the service URI *cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "", n.GetServiceURI().Port()) - - if *cfg.SceneClassificationModelPath != "" { - // Only enable experimental capabilities if scene classification model is actually loaded - transcoderCaps = append(transcoderCaps, core.ExperimentalCapabilities()...) - } if !*cfg.Transcoder && n.OrchSecret == "" { glog.Fatal("Running an orchestrator requires an -orchSecret for standalone mode or -transcoder for orchestrator+transcoder mode") } diff --git a/common/util_test.go b/common/util_test.go index 0bb0cb46a..9a541669d 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -353,6 +353,22 @@ func TestRatPriceInfo(t *testing.T) { assert.Zero(priceInfo.Cmp(big.NewRat(7, 2))) } +func TestParseAccelDevices_FailedDetection(t *testing.T) { + assert := assert.New(t) + + getGPU = func() ([]*gpu.GraphicsCard, error) { + return []*gpu.GraphicsCard{}, nil + } + getPCI = func() ([]*pci.Device, error) { + return []*pci.Device{}, nil + } + + ids, err := ParseAccelDevices("all", ffmpeg.Nvidia) + + assert.NotNil(err) + assert.Equal(len(ids), 0) +} + func TestParseAccessDevices_Gpu(t *testing.T) { assert := assert.New(t) @@ -458,15 +474,6 @@ func TestParseAccelDevices_WrongDriver(t *testing.T) { getPCI = originGetPCI } -func TestParseAccelDevices_FailedDetection(t *testing.T) { - assert := assert.New(t) - - ids, err := ParseAccelDevices("all", ffmpeg.Nvidia) - - assert.NotNil(err) - assert.Equal(len(ids), 0) -} - func TestParseAccelDevices_CustomSelection(t *testing.T) { assert := assert.New(t) diff --git a/server/ot_rpc.go b/server/ot_rpc.go index 0e8a133e9..3c822449e 100644 --- a/server/ot_rpc.go +++ b/server/ot_rpc.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "errors" "fmt" + "github.com/livepeer/lpms/ffmpeg" "io" "io/ioutil" "mime" @@ -259,8 +261,17 @@ func sendTranscodeResult(ctx context.Context, n *core.LivepeerNode, orchAddr str req.Header.Set("Credentials", n.OrchSecret) req.Header.Set("Content-Type", contentType) req.Header.Set("TaskId", strconv.FormatInt(notify.TaskId, 10)) + pixels := int64(0) + // add detections if tData != nil { + detectData, err := json.Marshal(tData.Detections) + if err != nil { + clog.Errorf(ctx, "Error posting results, couldn't serialize detection data orch=%s staskId=%d url=%s err=%q", orchAddr, + notify.TaskId, notify.Url, err) + return + } + req.Header.Set("Detections", string(detectData)) pixels = tData.Pixels } req.Header.Set("Pixels", strconv.FormatInt(pixels, 10)) @@ -343,6 +354,19 @@ func (h *lphttp) TranscodeResults(w http.ResponseWriter, r *http.Request) { return } + // read detection data - only scene classification is supported + var detections []ffmpeg.DetectData + var sceneDetections []ffmpeg.SceneClassificationData + err = json.Unmarshal([]byte(r.Header.Get("Detections")), &sceneDetections) + if err != nil { + glog.Error("Could not parse detection data ", err) + http.Error(w, "Invalid detection data", http.StatusBadRequest) + return + } + for _, sd := range sceneDetections { + detections = append(detections, sd) + } + var res core.RemoteTranscoderResult if transcodingErrorMimeType == mediaType { w.Write([]byte("OK")) @@ -407,8 +431,9 @@ func (h *lphttp) TranscodeResults(w http.ResponseWriter, r *http.Request) { } } res.TranscodeData = &core.TranscodeData{ - Segments: segments, - Pixels: decodedPixels, + Segments: segments, + Pixels: decodedPixels, + Detections: detections, } dlDur := time.Since(start) glog.V(common.VERBOSE).Infof("Downloaded results from remote transcoder=%s taskId=%d dur=%s", r.RemoteAddr, tid, dlDur)