From 3acf4eab323df725f60feae6a8b288030cd58fb5 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 27 Jan 2025 09:50:24 +0000 Subject: [PATCH] Segment file cleanup process (#3371) * Segment file cleanup process * Provide a way to cancel * windows build fix --- cmd/livepeer/starter/starter.go | 2 +- media/rtmp2segment.go | 50 +++++++++++++++++++++++++++++++-- media/rtmp2segment_windows.go | 4 +++ server/ai_mediaserver.go | 3 +- server/ai_worker_test.go | 23 --------------- server/cliserver_test.go | 17 ++++++++--- server/mediaserver.go | 6 ++-- server/mediaserver_test.go | 2 +- server/push_test.go | 8 +++--- 9 files changed, 77 insertions(+), 38 deletions(-) diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 19ddcd991a..dd264845ec 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -1589,7 +1589,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { //Create Livepeer Node //Set up the media server - s, err := server.NewLivepeerServer(*cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions) + s, err := server.NewLivepeerServer(ctx, *cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions) if err != nil { exit("Error creating Livepeer server: err=%q", err) } diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index c2770dbf99..60783844e2 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -22,7 +22,12 @@ import ( "golang.org/x/sys/unix" ) -var waitTimeout = 20 * time.Second +const ( + waitTimeout = 20 * time.Second + fileCleanupInterval = time.Hour + fileCleanupMaxAge = 4 * time.Hour + outFileSuffix = ".ts" +) type MediaSegmenter struct { Workdir string @@ -30,7 +35,7 @@ type MediaSegmenter struct { } func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) { - outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts") + outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d"+outFileSuffix) completionSignal := make(chan bool, 1) procCtx, procCancel := context.WithCancel(context.Background()) // parent ctx is a short lived http request wg := &sync.WaitGroup{} @@ -257,3 +262,44 @@ func randomString() string { } return strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=") } + +// StartFileCleanup starts a goroutine to periodically remove any old temporary files accidentally left behind +func StartFileCleanup(ctx context.Context, workDir string) { + go func() { + ticker := time.NewTicker(fileCleanupInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := cleanUpLocalTmpFiles(ctx, workDir, "*"+outFileSuffix, fileCleanupMaxAge); err != nil { + clog.Errorf(ctx, "Error cleaning up segment files: %v", err) + } + } + } + }() +} + +func cleanUpLocalTmpFiles(ctx context.Context, dir string, filenamePattern string, maxAge time.Duration) error { + filesRemoved := 0 + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return nil + } + if info.Mode().IsRegular() { + if match, _ := filepath.Match(filenamePattern, info.Name()); match { + if time.Since(info.ModTime()) > maxAge { + err = os.Remove(path) + if err != nil { + return fmt.Errorf("error removing file %s: %w", path, err) + } + filesRemoved++ + } + } + } + return nil + }) + clog.Infof(ctx, "Segment file cleanup removed %d files", filesRemoved) + return err +} diff --git a/media/rtmp2segment_windows.go b/media/rtmp2segment_windows.go index 444943a990..b9d03aca4a 100644 --- a/media/rtmp2segment_windows.go +++ b/media/rtmp2segment_windows.go @@ -12,3 +12,7 @@ type MediaSegmenter struct { func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) { // Not supported for Windows } + +func StartFileCleanup(ctx context.Context, workDir string) { + // Not supported for Windows +} diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 16e45b59d9..2a296d3df6 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -51,7 +51,7 @@ const ( Complete ImageToVideoStatus = "complete" ) -func startAIMediaServer(ls *LivepeerServer) error { +func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error { swagger, err := worker.GetSwagger() if err != nil { return err @@ -90,6 +90,7 @@ func startAIMediaServer(ls *LivepeerServer) error { // Stream status ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus()) + media.StartFileCleanup(ctx, ls.LivepeerNode.WorkDir) return nil } diff --git a/server/ai_worker_test.go b/server/ai_worker_test.go index ab31a3e712..8e33c9ff63 100644 --- a/server/ai_worker_test.go +++ b/server/ai_worker_test.go @@ -19,7 +19,6 @@ import ( "github.com/livepeer/ai-worker/worker" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" - "github.com/livepeer/go-livepeer/eth" "github.com/livepeer/go-livepeer/net" "github.com/livepeer/go-tools/drivers" oapitypes "github.com/oapi-codegen/runtime/types" @@ -403,11 +402,6 @@ func createAIJob(taskId int64, pipeline, modelId, inputUrl string) *net.NotifyAI return notify } -type stubResult struct { - Attachment []byte - Result string -} - func aiResultsTest(l lphttp, w *httptest.ResponseRecorder, r *http.Request) (int, string) { handler := l.AIResults() handler.ServeHTTP(w, r) @@ -418,23 +412,6 @@ func aiResultsTest(l lphttp, w *httptest.ResponseRecorder, r *http.Request) (int return resp.StatusCode, string(body) } -func newMockAIOrchestratorServer() *httptest.Server { - n, _ := core.NewLivepeerNode(ð.StubClient{}, "./tmp", nil) - n.NodeType = core.OrchestratorNode - n.AIWorkerManager = core.NewRemoteAIWorkerManager() - s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "") - mux := s.cliWebServerHandlers("addr") - srv := httptest.NewServer(mux) - return srv -} - -func connectWorker(n *core.LivepeerNode) { - strm := &StubAIWorkerServer{} - caps := createStubAIWorkerCapabilities() - go func() { n.AIWorkerManager.Manage(strm, caps.ToNetCapabilities()) }() - time.Sleep(1 * time.Millisecond) -} - func createStubAIWorkerCapabilities() *core.Capabilities { //create capabilities and constraints the ai worker sends to orch constraints := make(core.PerCapabilityConstraints) diff --git a/server/cliserver_test.go b/server/cliserver_test.go index 2083759021..6b3f9800cc 100644 --- a/server/cliserver_test.go +++ b/server/cliserver_test.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -33,7 +34,9 @@ func newMockServer() *httptest.Server { go func() { n.TranscoderManager.Manage(strm, 5, nil) }() time.Sleep(1 * time.Millisecond) n.Transcoder = n.TranscoderManager - s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "") mux := s.cliWebServerHandlers("addr") srv := httptest.NewServer(mux) return srv @@ -52,7 +55,9 @@ func TestActivateOrchestrator(t *testing.T) { go func() { n.TranscoderManager.Manage(strm, 5, nil) }() time.Sleep(1 * time.Millisecond) n.Transcoder = n.TranscoderManager - s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "") mux := s.cliWebServerHandlers("addr") srv := httptest.NewServer(mux) defer srv.Close() @@ -220,7 +225,9 @@ func TestGetEthChainID(t *testing.T) { err = dbh.SetChainID(big.NewInt(1)) require.Nil(err) n, _ := core.NewLivepeerNode(ð.StubClient{}, "./tmp", dbh) - s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "") mux := s.cliWebServerHandlers("addr") srv := httptest.NewServer(mux) defer srv.Close() @@ -308,7 +315,9 @@ func TestRegisteredOrchestrators(t *testing.T) { n, _ := core.NewLivepeerNode(eth, "./tmp", dbh) - s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "") mux := s.cliWebServerHandlers("addr") srv := httptest.NewServer(mux) defer srv.Close() diff --git a/server/mediaserver.go b/server/mediaserver.go index aaec9210f6..95bd8571af 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -153,7 +153,7 @@ type authWebhookResponse struct { ForceSessionReinit bool `json:"forceSessionReinit"` } -func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) { +func NewLivepeerServer(ctx context.Context, rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) { opts := lpmscore.LPMSOpts{ RtmpAddr: rtmpAddr, RtmpDisabled: true, @@ -201,7 +201,9 @@ func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bo if lpNode.NodeType == core.BroadcasterNode && httpIngest { opts.HttpMux.HandleFunc("/live/", ls.HandlePush) - startAIMediaServer(ls) + if err := startAIMediaServer(ctx, ls); err != nil { + return nil, fmt.Errorf("failed to start AI media server: %w", err) + } } opts.HttpMux.HandleFunc("/recordings/", ls.HandleRecordings) return ls, nil diff --git a/server/mediaserver_test.go b/server/mediaserver_test.go index 16b4d91e23..6f97de4582 100644 --- a/server/mediaserver_test.go +++ b/server/mediaserver_test.go @@ -88,7 +88,7 @@ func setupServerWithCancel() (*LivepeerServer, context.CancelFunc) { } n, _ := core.NewLivepeerNode(nil, "./tmp", nil) // doesn't really starts server at 1938 - S, _ = NewLivepeerServer("127.0.0.1:1938", n, true, "") + S, _ = NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "") // rtmpurl := fmt.Sprintf("rtmp://127.0.0.1:%d", port) // S, _ = NewLivepeerServer(rtmpurl, n, true, "") // glog.Errorf("++> rtmp server with port %d", port) diff --git a/server/push_test.go b/server/push_test.go index bb17ac8a11..254b6d64e8 100644 --- a/server/push_test.go +++ b/server/push_test.go @@ -426,7 +426,7 @@ func TestPush_HTTPIngest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // HTTP ingest disabled - s, _ := NewLivepeerServer("127.0.0.1:1938", n, false, "") + s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, false, "") s.SetContextFromUnitTest(ctx) h, pattern := s.HTTPMux.Handler(req) assert.Equal("", pattern) @@ -440,7 +440,7 @@ func TestPush_HTTPIngest(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // HTTP ingest enabled - s, _ = NewLivepeerServer("127.0.0.1:1938", n, true, "") + s, _ = NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "") s.SetContextFromUnitTest(ctx) h, pattern = s.HTTPMux.Handler(req) assert.Equal("/live/", pattern) @@ -1106,8 +1106,8 @@ func TestPush_OSPerStream(t *testing.T) { assert := assert.New(t) drivers.NodeStorage = drivers.NewMemoryDriver(nil) n, _ := core.NewLivepeerNode(nil, "./tmp", nil) - s, _ := NewLivepeerServer("127.0.0.1:1939", n, true, "") serverCtx, serverCancel := context.WithCancel(context.Background()) + s, _ := NewLivepeerServer(serverCtx, "127.0.0.1:1939", n, true, "") s.SetContextFromUnitTest(serverCtx) defer serverCleanup(s) @@ -1273,8 +1273,8 @@ func TestPush_ConcurrentSegments(t *testing.T) { drivers.NodeStorage = drivers.NewMemoryDriver(nil) n, _ := core.NewLivepeerNode(nil, "./tmp", nil) n.NodeType = core.BroadcasterNode - s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "") serverCtx, serverCancel := context.WithCancel(context.Background()) + s, _ := NewLivepeerServer(serverCtx, "127.0.0.1:1938", n, true, "") s.SetContextFromUnitTest(serverCtx) oldURL := AuthWebhookURL defer func() { AuthWebhookURL = oldURL }()