From ac0ddc9e8a558be097f22a5db4f0520568ffa09a Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Fri, 3 Jan 2025 13:08:46 +0100 Subject: [PATCH] playback: improve /list response time (#3637) (#4096) Response times of the /list endpoint were slow because the duration of each segment was computed from scratch by summing the duration of each of its parts. This is improved by storing the duration of the overall segment in the header and using that, if available. --- internal/conf/path.go | 7 ++ internal/playback/on_get.go | 4 +- internal/playback/on_list.go | 26 ++-- internal/playback/on_list_test.go | 149 +++++++++++++++++++++++ internal/playback/segment_fmp4.go | 72 +++++++---- internal/playback/segment_fmp4_test.go | 4 +- internal/recorder/format_fmp4_segment.go | 78 +++++++++++- 7 files changed, 296 insertions(+), 44 deletions(-) diff --git a/internal/conf/path.go b/internal/conf/path.go index a83ff64894b..fbeee61bf36 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -286,9 +286,11 @@ func (pconf *Path) validate( return fmt.Errorf("a path with a regular expression (or path 'all') and a static source" + " must have 'sourceOnDemand' set to true") } + if pconf.SRTPublishPassphrase != "" && pconf.Source != "publisher" { return fmt.Errorf("'srtPublishPassphase' can only be used when source is 'publisher'") } + if pconf.SourceOnDemand && pconf.Source == "publisher" { return fmt.Errorf("'sourceOnDemand' is useless when source is 'publisher'") } @@ -488,6 +490,11 @@ func (pconf *Path) validate( } } + // avoid overflowing DurationV0 of mvhd + if pconf.RecordSegmentDuration > Duration(24*time.Hour) { + return fmt.Errorf("maximum segment duration is 1 day") + } + // Authentication (deprecated) if deprecatedCredentialsMode { diff --git a/internal/playback/on_get.go b/internal/playback/on_get.go index da470f41bf1..350c24b6079 100644 --- a/internal/playback/on_get.go +++ b/internal/playback/on_get.go @@ -57,7 +57,7 @@ func seekAndMux( } defer f.Close() - firstInit, err = segmentFMP4ReadInit(f) + firstInit, _, err = segmentFMP4ReadHeader(f) if err != nil { return err } @@ -81,7 +81,7 @@ func seekAndMux( defer f.Close() var init *fmp4.Init - init, err = segmentFMP4ReadInit(f) + init, _, err = segmentFMP4ReadHeader(f) if err != nil { return err } diff --git a/internal/playback/on_list.go b/internal/playback/on_list.go index 7315adfd42f..7149ed71ded 100644 --- a/internal/playback/on_list.go +++ b/internal/playback/on_list.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net/http" "net/url" "os" @@ -29,7 +28,7 @@ type listEntry struct { URL string `json:"url"` } -func computeDurationAndConcatenate( +func readDurationAndConcatenate( recordFormat conf.RecordFormat, segments []*recordstore.Segment, ) ([]listEntry, error) { @@ -45,19 +44,18 @@ func computeDurationAndConcatenate( } defer f.Close() - init, err := segmentFMP4ReadInit(f) + init, duration, err := segmentFMP4ReadHeader(f) if err != nil { return err } - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return err - } - - maxDuration, err := segmentFMP4ReadDuration(f, init) - if err != nil { - return err + // if duration is not present in the header, compute it + // by parsing each part + if duration == 0 { + duration, err = segmentFMP4ReadDurationFromParts(f, init) + if err != nil { + return err + } } if len(out) != 0 && segmentFMP4CanBeConcatenated( @@ -66,12 +64,12 @@ func computeDurationAndConcatenate( init, seg.Start) { prevStart := out[len(out)-1].Start - curEnd := seg.Start.Add(maxDuration) + curEnd := seg.Start.Add(duration) out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) } else { out = append(out, listEntry{ Start: seg.Start, - Duration: listEntryDuration(maxDuration), + Duration: listEntryDuration(duration), }) } @@ -137,7 +135,7 @@ func (s *Server) onList(ctx *gin.Context) { return } - entries, err := computeDurationAndConcatenate(pathConf.RecordFormat, segments) + entries, err := readDurationAndConcatenate(pathConf.RecordFormat, segments) if err != nil { s.writeError(ctx, http.StatusInternalServerError, err) return diff --git a/internal/playback/on_list_test.go b/internal/playback/on_list_test.go index 2278ed4ea20..fc4e1d718ca 100644 --- a/internal/playback/on_list_test.go +++ b/internal/playback/on_list_test.go @@ -1,7 +1,10 @@ package playback import ( + "bytes" "encoding/json" + "fmt" + "io" "net/http" "net/url" "os" @@ -9,6 +12,8 @@ import ( "testing" "time" + "github.com/abema/go-mp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/test" "github.com/stretchr/testify/require" @@ -206,3 +211,147 @@ func TestOnListDifferentInit(t *testing.T) { }, }, out) } + +func writeDuration(f io.ReadWriteSeeker, d time.Duration) error { + _, err := f.Seek(0, io.SeekStart) + if err != nil { + return err + } + + // check and skip ftyp header and content + + buf := make([]byte, 8) + _, err = io.ReadFull(f, buf) + if err != nil { + return err + } + + if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) { + return fmt.Errorf("ftyp box not found") + } + + ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = f.Seek(int64(ftypSize), io.SeekStart) + if err != nil { + return err + } + + // check and skip moov header + + _, err = io.ReadFull(f, buf) + if err != nil { + return err + } + + if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) { + return fmt.Errorf("moov box not found") + } + + moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + moovPos, err := f.Seek(8, io.SeekCurrent) + if err != nil { + return err + } + + var mvhd mp4.Mvhd + _, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{}) + if err != nil { + return err + } + + mvhd.DurationV0 = uint32(d / time.Millisecond) + + _, err = f.Seek(moovPos, io.SeekStart) + if err != nil { + return err + } + + _, err = mp4.Marshal(f, &mvhd, mp4.Context{}) + if err != nil { + return err + } + + return nil +} + +func TestOnListCachedDuration(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755) + require.NoError(t, err) + + func() { + var f *os.File + f, err = os.Create(filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4")) + require.NoError(t, err) + defer f.Close() + + init := fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + }, + }, + }, + } + + err = init.Marshal(f) + require.NoError(t, err) + + err = writeDuration(f, 50*time.Second) + require.NoError(t, err) + }() + + s := &Server{ + Address: "127.0.0.1:9996", + ReadTimeout: conf.Duration(10 * time.Second), + PathConfs: map[string]*conf.Path{ + "mypath": { + Name: "mypath", + RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"), + }, + }, + AuthManager: test.NilAuthManager, + Parent: test.NilLogger, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + u, err := url.Parse("http://myuser:mypass@localhost:9996/list") + require.NoError(t, err) + + v := url.Values{} + v.Set("path", "mypath") + u.RawQuery = v.Encode() + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + require.NoError(t, err) + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out interface{} + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, []interface{}{ + map[string]interface{}{ + "duration": float64(50), + "start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano), + "url": "http://localhost:9996/get?duration=50&path=mypath&start=" + + url.QueryEscape(time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano)), + }, + }, out) +} diff --git a/internal/playback/segment_fmp4.go b/internal/playback/segment_fmp4.go index 33fd659cb5c..97e70051cf5 100644 --- a/internal/playback/segment_fmp4.go +++ b/internal/playback/segment_fmp4.go @@ -60,68 +60,90 @@ func segmentFMP4CanBeConcatenated( !curStart.After(prevEnd.Add(concatenationTolerance)) } -func segmentFMP4ReadInit(r io.ReadSeeker) (*fmp4.Init, error) { +func segmentFMP4ReadHeader(r io.ReadSeeker) (*fmp4.Init, time.Duration, error) { + // check and skip ftyp + buf := make([]byte, 8) _, err := io.ReadFull(r, buf) if err != nil { - return nil, err + return nil, 0, err } - // find ftyp - if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) { - return nil, fmt.Errorf("ftyp box not found") + return nil, 0, fmt.Errorf("ftyp box not found") } ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) _, err = r.Seek(int64(ftypSize), io.SeekStart) if err != nil { - return nil, err + return nil, 0, err } - // find moov + // check moov _, err = io.ReadFull(r, buf) if err != nil { - return nil, err + return nil, 0, err } if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) { - return nil, fmt.Errorf("moov box not found") + return nil, 0, fmt.Errorf("moov box not found") } moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - _, err = r.Seek(0, io.SeekStart) + // skip moov header + + _, err = r.Seek(8, io.SeekCurrent) if err != nil { - return nil, err + return nil, 0, err } - buf = make([]byte, ftypSize+moovSize) + // read mvhd + + var mvhd mp4.Mvhd + mvhdSize, err := mp4.Unmarshal(r, uint64(moovSize-8), &mvhd, mp4.Context{}) + if err != nil { + return nil, 0, err + } + + mvhdSize += 8 // add mvhd header + moovSize -= 8 + uint32(mvhdSize) // remove moov header and mvhd + + d := time.Duration(mvhd.DurationV0) * time.Second / time.Duration(mvhd.Timescale) + + // read tracks + + buf = make([]byte, uint64(moovSize)) _, err = io.ReadFull(r, buf) if err != nil { - return nil, err + return nil, 0, err } var init fmp4.Init err = init.Unmarshal(bytes.NewReader(buf)) if err != nil { - return nil, err + return nil, 0, err } - return &init, nil + return &init, d, nil } -func segmentFMP4ReadDuration( +func segmentFMP4ReadDurationFromParts( r io.ReadSeeker, init *fmp4.Init, ) (time.Duration, error) { - // find and skip ftyp + _, err := r.Seek(0, io.SeekStart) + if err != nil { + return 0, err + } + + // check and skip ftyp buf := make([]byte, 8) - _, err := io.ReadFull(r, buf) + _, err = io.ReadFull(r, buf) if err != nil { return 0, err } @@ -137,7 +159,7 @@ func segmentFMP4ReadDuration( return 0, err } - // find and skip moov + // check and skip moov _, err = io.ReadFull(r, buf) if err != nil { @@ -232,17 +254,19 @@ func segmentFMP4ReadDuration( // foreach traf +outer: for { _, err := io.ReadFull(r, buf) if err != nil { return 0, err } - if !bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}) { - if bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) { - break - } - return 0, fmt.Errorf("traf box not found") + switch { + case bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}): + case bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}): + break outer + default: + return 0, fmt.Errorf("unexpected box %x", buf[4:8]) } // parse tfhd diff --git a/internal/playback/segment_fmp4_test.go b/internal/playback/segment_fmp4_test.go index 178af56257d..6314f9395b6 100644 --- a/internal/playback/segment_fmp4_test.go +++ b/internal/playback/segment_fmp4_test.go @@ -48,7 +48,7 @@ func writeBenchInit(f io.WriteSeeker) { } } -func BenchmarkFMP4ReadInit(b *testing.B) { +func BenchmarkFMP4ReadHeader(b *testing.B) { f, err := os.CreateTemp(os.TempDir(), "mediamtx-playback-fmp4-") if err != nil { panic(err) @@ -66,7 +66,7 @@ func BenchmarkFMP4ReadInit(b *testing.B) { } defer f.Close() - _, err = segmentFMP4ReadInit(f) + _, _, err = segmentFMP4ReadHeader(f) if err != nil { panic(err) } diff --git a/internal/recorder/format_fmp4_segment.go b/internal/recorder/format_fmp4_segment.go index 68b5f50a4a4..49a2a9897a4 100644 --- a/internal/recorder/format_fmp4_segment.go +++ b/internal/recorder/format_fmp4_segment.go @@ -1,10 +1,13 @@ package recorder import ( + "bytes" + "fmt" "io" "os" "time" + "github.com/abema/go-mp4" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" @@ -31,6 +34,70 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error { return err } +func writeDuration(f io.ReadWriteSeeker, d time.Duration) error { + _, err := f.Seek(0, io.SeekStart) + if err != nil { + return err + } + + // check and skip ftyp header and content + + buf := make([]byte, 8) + _, err = io.ReadFull(f, buf) + if err != nil { + return err + } + + if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) { + return fmt.Errorf("ftyp box not found") + } + + ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = f.Seek(int64(ftypSize), io.SeekStart) + if err != nil { + return err + } + + // check and skip moov header + + _, err = io.ReadFull(f, buf) + if err != nil { + return err + } + + if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) { + return fmt.Errorf("moov box not found") + } + + moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + moovPos, err := f.Seek(8, io.SeekCurrent) + if err != nil { + return err + } + + var mvhd mp4.Mvhd + _, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{}) + if err != nil { + return err + } + + mvhd.DurationV0 = uint32(d / time.Millisecond) + + _, err = f.Seek(moovPos, io.SeekStart) + if err != nil { + return err + } + + _, err = mp4.Marshal(f, &mvhd, mp4.Context{}) + if err != nil { + return err + } + + return nil +} + type formatFMP4Segment struct { f *formatFMP4 startDTS time.Duration @@ -55,13 +122,20 @@ func (s *formatFMP4Segment) close() error { if s.fi != nil { s.f.ri.Log(logger.Debug, "closing segment %s", s.path) - err2 := s.fi.Close() + + // write overall duration in the header in order to speed up the playback server + duration := s.lastDTS - s.startDTS + err2 := writeDuration(s.fi, duration) + if err == nil { + err = err2 + } + + err2 = s.fi.Close() if err == nil { err = err2 } if err2 == nil { - duration := s.lastDTS - s.startDTS s.f.ri.rec.OnSegmentComplete(s.path, duration) } }