Skip to content

Commit

Permalink
Segment file cleanup process (#3371)
Browse files Browse the repository at this point in the history
* Segment file cleanup process

* Provide a way to cancel

* windows build fix
  • Loading branch information
mjh1 authored Jan 27, 2025
1 parent 56ab75e commit 3acf4ea
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 38 deletions.
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
//Create Livepeer Node

//Set up the media server
s, err := server.NewLivepeerServer(*cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions)
s, err := server.NewLivepeerServer(ctx, *cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions)
if err != nil {
exit("Error creating Livepeer server: err=%q", err)
}
Expand Down
50 changes: 48 additions & 2 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ import (
"golang.org/x/sys/unix"
)

var waitTimeout = 20 * time.Second
const (
waitTimeout = 20 * time.Second
fileCleanupInterval = time.Hour
fileCleanupMaxAge = 4 * time.Hour
outFileSuffix = ".ts"
)

type MediaSegmenter struct {
Workdir string
MediaMTXClient *MediaMTXClient
}

func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) {
outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts")
outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d"+outFileSuffix)
completionSignal := make(chan bool, 1)
procCtx, procCancel := context.WithCancel(context.Background()) // parent ctx is a short lived http request
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -257,3 +262,44 @@ func randomString() string {
}
return strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")
}

// StartFileCleanup starts a goroutine to periodically remove any old temporary files accidentally left behind
func StartFileCleanup(ctx context.Context, workDir string) {
go func() {
ticker := time.NewTicker(fileCleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := cleanUpLocalTmpFiles(ctx, workDir, "*"+outFileSuffix, fileCleanupMaxAge); err != nil {
clog.Errorf(ctx, "Error cleaning up segment files: %v", err)
}
}
}
}()
}

func cleanUpLocalTmpFiles(ctx context.Context, dir string, filenamePattern string, maxAge time.Duration) error {
filesRemoved := 0
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if info.Mode().IsRegular() {
if match, _ := filepath.Match(filenamePattern, info.Name()); match {
if time.Since(info.ModTime()) > maxAge {
err = os.Remove(path)
if err != nil {
return fmt.Errorf("error removing file %s: %w", path, err)
}
filesRemoved++
}
}
}
return nil
})
clog.Infof(ctx, "Segment file cleanup removed %d files", filesRemoved)
return err
}
4 changes: 4 additions & 0 deletions media/rtmp2segment_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ type MediaSegmenter struct {
func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) {
// Not supported for Windows
}

func StartFileCleanup(ctx context.Context, workDir string) {
// Not supported for Windows
}
3 changes: 2 additions & 1 deletion server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
Complete ImageToVideoStatus = "complete"
)

func startAIMediaServer(ls *LivepeerServer) error {
func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error {
swagger, err := worker.GetSwagger()
if err != nil {
return err
Expand Down Expand Up @@ -90,6 +90,7 @@ func startAIMediaServer(ls *LivepeerServer) error {
// Stream status
ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus())

media.StartFileCleanup(ctx, ls.LivepeerNode.WorkDir)
return nil
}

Expand Down
23 changes: 0 additions & 23 deletions server/ai_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/livepeer/ai-worker/worker"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-tools/drivers"
oapitypes "github.com/oapi-codegen/runtime/types"
Expand Down Expand Up @@ -403,11 +402,6 @@ func createAIJob(taskId int64, pipeline, modelId, inputUrl string) *net.NotifyAI
return notify
}

type stubResult struct {
Attachment []byte
Result string
}

func aiResultsTest(l lphttp, w *httptest.ResponseRecorder, r *http.Request) (int, string) {
handler := l.AIResults()
handler.ServeHTTP(w, r)
Expand All @@ -418,23 +412,6 @@ func aiResultsTest(l lphttp, w *httptest.ResponseRecorder, r *http.Request) (int
return resp.StatusCode, string(body)
}

func newMockAIOrchestratorServer() *httptest.Server {
n, _ := core.NewLivepeerNode(&eth.StubClient{}, "./tmp", nil)
n.NodeType = core.OrchestratorNode
n.AIWorkerManager = core.NewRemoteAIWorkerManager()
s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
return srv
}

func connectWorker(n *core.LivepeerNode) {
strm := &StubAIWorkerServer{}
caps := createStubAIWorkerCapabilities()
go func() { n.AIWorkerManager.Manage(strm, caps.ToNetCapabilities()) }()
time.Sleep(1 * time.Millisecond)
}

func createStubAIWorkerCapabilities() *core.Capabilities {
//create capabilities and constraints the ai worker sends to orch
constraints := make(core.PerCapabilityConstraints)
Expand Down
17 changes: 13 additions & 4 deletions server/cliserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -33,7 +34,9 @@ func newMockServer() *httptest.Server {
go func() { n.TranscoderManager.Manage(strm, 5, nil) }()
time.Sleep(1 * time.Millisecond)
n.Transcoder = n.TranscoderManager
s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
return srv
Expand All @@ -52,7 +55,9 @@ func TestActivateOrchestrator(t *testing.T) {
go func() { n.TranscoderManager.Manage(strm, 5, nil) }()
time.Sleep(1 * time.Millisecond)
n.Transcoder = n.TranscoderManager
s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
defer srv.Close()
Expand Down Expand Up @@ -220,7 +225,9 @@ func TestGetEthChainID(t *testing.T) {
err = dbh.SetChainID(big.NewInt(1))
require.Nil(err)
n, _ := core.NewLivepeerNode(&eth.StubClient{}, "./tmp", dbh)
s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
defer srv.Close()
Expand Down Expand Up @@ -308,7 +315,9 @@ func TestRegisteredOrchestrators(t *testing.T) {

n, _ := core.NewLivepeerNode(eth, "./tmp", dbh)

s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
defer srv.Close()
Expand Down
6 changes: 4 additions & 2 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type authWebhookResponse struct {
ForceSessionReinit bool `json:"forceSessionReinit"`
}

func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) {
func NewLivepeerServer(ctx context.Context, rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) {
opts := lpmscore.LPMSOpts{
RtmpAddr: rtmpAddr,
RtmpDisabled: true,
Expand Down Expand Up @@ -201,7 +201,9 @@ func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bo
if lpNode.NodeType == core.BroadcasterNode && httpIngest {
opts.HttpMux.HandleFunc("/live/", ls.HandlePush)

startAIMediaServer(ls)
if err := startAIMediaServer(ctx, ls); err != nil {
return nil, fmt.Errorf("failed to start AI media server: %w", err)
}
}
opts.HttpMux.HandleFunc("/recordings/", ls.HandleRecordings)
return ls, nil
Expand Down
2 changes: 1 addition & 1 deletion server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func setupServerWithCancel() (*LivepeerServer, context.CancelFunc) {
}
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
// doesn't really starts server at 1938
S, _ = NewLivepeerServer("127.0.0.1:1938", n, true, "")
S, _ = NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
// rtmpurl := fmt.Sprintf("rtmp://127.0.0.1:%d", port)
// S, _ = NewLivepeerServer(rtmpurl, n, true, "")
// glog.Errorf("++> rtmp server with port %d", port)
Expand Down
8 changes: 4 additions & 4 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestPush_HTTPIngest(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
// HTTP ingest disabled
s, _ := NewLivepeerServer("127.0.0.1:1938", n, false, "")
s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, false, "")
s.SetContextFromUnitTest(ctx)
h, pattern := s.HTTPMux.Handler(req)
assert.Equal("", pattern)
Expand All @@ -440,7 +440,7 @@ func TestPush_HTTPIngest(t *testing.T) {

ctx, cancel = context.WithCancel(context.Background())
// HTTP ingest enabled
s, _ = NewLivepeerServer("127.0.0.1:1938", n, true, "")
s, _ = NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
s.SetContextFromUnitTest(ctx)
h, pattern = s.HTTPMux.Handler(req)
assert.Equal("/live/", pattern)
Expand Down Expand Up @@ -1106,8 +1106,8 @@ func TestPush_OSPerStream(t *testing.T) {
assert := assert.New(t)
drivers.NodeStorage = drivers.NewMemoryDriver(nil)
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
s, _ := NewLivepeerServer("127.0.0.1:1939", n, true, "")
serverCtx, serverCancel := context.WithCancel(context.Background())
s, _ := NewLivepeerServer(serverCtx, "127.0.0.1:1939", n, true, "")
s.SetContextFromUnitTest(serverCtx)
defer serverCleanup(s)

Expand Down Expand Up @@ -1273,8 +1273,8 @@ func TestPush_ConcurrentSegments(t *testing.T) {
drivers.NodeStorage = drivers.NewMemoryDriver(nil)
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
n.NodeType = core.BroadcasterNode
s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
serverCtx, serverCancel := context.WithCancel(context.Background())
s, _ := NewLivepeerServer(serverCtx, "127.0.0.1:1938", n, true, "")
s.SetContextFromUnitTest(serverCtx)
oldURL := AuthWebhookURL
defer func() { AuthWebhookURL = oldURL }()
Expand Down

0 comments on commit 3acf4ea

Please sign in to comment.