Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffer termination timeout #74

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/buffer-interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "io"

type BufferInterface interface {
StartBuffer(stream *Stream) error
StopBuffer()
CloseBuffer()
HandleByteOutput(stdOut io.ReadCloser)
PrepareBufferFolder(folder string) error
GetBufTmpFiles() []string
Expand All @@ -13,4 +15,7 @@ type BufferInterface interface {
CheckBufferFolder() (bool, error)
CheckBufferedFile(file string) (bool, error)
writeToPipe(file string) error
GetPipeReader() *io.PipeReader
GetStopChan() chan struct{}
SetStopChan(chan struct{})
}
223 changes: 161 additions & 62 deletions src/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type StreamBuffer struct {
IsThirdPartyBuffer bool
Stream *Stream // Reference to the parents struct
StopChan chan struct{}
CloseChan chan struct{}
LatestSegment int
OldSegments []string
PipeWriter *io.PipeWriter
Expand All @@ -40,14 +41,36 @@ const (

func (sb *StreamBuffer) StartBuffer(stream *Stream) error {
sb.Stream = stream
if err := sb.PrepareBufferFolder(stream.Folder + string(os.PathSeparator)); err != nil {
if err := sb.PrepareBufferFolder(filepath.Join(stream.Folder, "0.ts")); err != nil {
// If something went wrong when setting up the buffer storage don't run at all
stream.ReportError(err, BufferFolderError, "", true)
return err
}
return nil
}

func (sb *StreamBuffer) StopBuffer() {
close(sb.StopChan)
}

func (sb *StreamBuffer) CloseBuffer() {
sb.StopBuffer()
close(sb.CloseChan)
sb.RemoveBufferedFiles(filepath.Join(sb.Stream.Folder, "0.ts"))
}

func (sb *StreamBuffer) GetPipeReader() *io.PipeReader{
return sb.PipeReader
}

func (sb *StreamBuffer) GetStopChan() chan struct{} {
return sb.StopChan
}

func (sb *StreamBuffer) SetStopChan(stopChan chan struct{}) {
sb.StopChan = stopChan
}

/*
HandleByteOutput save the byte ouptut of the command or http request as files
*/
Expand All @@ -66,68 +89,125 @@ func (sb *StreamBuffer) HandleByteOutput(stdOut io.ReadCloser) {
var tmpFile string
reader := bufio.NewReader(stdOut)
for {
if init {
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
f, err = bufferVFS.Create(tmpFile)
if err != nil {
select {
case <- sb.CloseChan:
// If the stream got stopped, stop the output
return
default:
if init {
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
ShowError(err, CreateFileError)
sb.Stream.ReportError(err, CreateFileError, "", true)
return
}
init = false
}
n, err := reader.Read(buffer)
if n == 0 && err == nil {
continue
}
if err == io.EOF {
f.Close()
ShowError(err, CreateFileError)
sb.Stream.ReportError(err, CreateFileError, "", true)
ShowDebug("Buffer reached EOF!", 3)
sb.Stream.ReportError(err, EndOfFileError, "", true)
return
}
init = false
}
n, err := reader.Read(buffer)
if n == 0 && err == nil {
continue
}
if err == io.EOF {
f.Close()
ShowDebug("Buffer reached EOF!", 3)
sb.Stream.ReportError(err, EndOfFileError, "", true)
return
}
if err != nil {
f.Close()
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, ReadIntoBufferError, "", true)
return
}
if _, err := f.Write(buffer[:n]); err != nil {
f.Close()
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, WriteToBufferError, "", true)
return
}
fileSize += n
// Check if the file size exceeds the threshold
if fileSize >= TS_PACKAGE_MIN_SIZE*1024 {
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Close the current file and create a new one
f.Close()
sb.LatestSegment = tmpSegment
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
sb.Stream.ReportError(err, CreateFileError, "", true)
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, ReadIntoBufferError, "", true)
return
}
fileSize = 0
if _, err := f.Write(buffer[:n]); err != nil {
f.Close()
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, WriteToBufferError, "", true)
return
}
fileSize += n
// Check if the file size exceeds the threshold
if fileSize >= TS_PACKAGE_MIN_SIZE*1024 {
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Close the current file and create a new one
f.Close()
sb.LatestSegment = tmpSegment
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
sb.Stream.ReportError(err, CreateFileError, "", true)
return
}
fileSize = 0
}
}
}
}

func (sb *StreamBuffer) RemoveBufferedFiles(folder string) error {
test := filepath.Dir(folder)
if err := sb.FileSystem.RemoveAll(test); err != nil {
return fmt.Errorf("failed to remove buffer folder: %w", err)
}
return nil
}

/*
PrepareBufferFolder will clean the buffer folder and check if the folder exists
*/
func (sb *StreamBuffer) PrepareBufferFolder(folder string) error {
if err := sb.FileSystem.RemoveAll(getPlatformPath(folder)); err != nil {
return fmt.Errorf("failed to remove buffer folder: %w", err)
if err := sb.RemoveBufferedFiles(folder); err != nil {
return err
}

if err := sb.createBufferFolder(filepath.Dir(folder)); err != nil {
return fmt.Errorf("failed to create buffer folder: %w", err)
}

if err := checkVFSFolder(folder, sb.FileSystem); err != nil {
return fmt.Errorf("failed to check buffer folder: %w", err)
return nil
}

func (sb *StreamBuffer) createBufferFolder(path string) (err error) {

var debug string
_, err = sb.FileSystem.Stat(path)

if fsIsNotExistErr(err) {
// Folder does not exist, will now be created

// If we are on Windows and the cache location path is NOT on C:\ we need to create the volume it is located on
// Failure to do so here will result in a panic error and the stream not playing
if Settings.StoreBufferInRAM {
if sb.FileSystem.OSType() == avfs.OsWindows {
vm := sb.FileSystem.(avfs.VolumeManager)
pathIterator := avfs.NewPathIterator(sb.FileSystem, path)
if pathIterator.VolumeName() != "C:" {
vm.VolumeAdd(path)
}
}

err = sb.FileSystem.MkdirAll(path, 0755)
if err == nil {
debug = fmt.Sprintf("Create virtual filesystem Folder: %s", path)
ShowDebug(debug, 1)
} else {
return err
}

} else {
err = sb.FileSystem.MkdirAll(path, 0755)
if err == nil {
debug = fmt.Sprintf("Created folder on disk: %s", path)
ShowDebug(debug, 1)
} else {
return err
}
}

return nil
}

return nil
Expand All @@ -139,12 +219,11 @@ and returns a sorted list with the file names
*/
func (sb *StreamBuffer) GetBufTmpFiles() (tmpFiles []string) {

var tmpFolder = sb.Stream.Folder + string(os.PathSeparator)
var tmpFolder = sb.Stream.Folder
var fileIDs []float64

if _, err := sb.FileSystem.Stat(tmpFolder); !fsIsNotExistErr(err) {

files, err := sb.FileSystem.ReadDir(getPlatformPath(tmpFolder))
files, err := sb.FileSystem.ReadDir(tmpFolder)
if err != nil {
ShowError(err, 000)
return
Expand Down Expand Up @@ -183,17 +262,17 @@ func (sb *StreamBuffer) GetBufTmpFiles() (tmpFiles []string) {

func (sb *StreamBuffer) GetBufferedSize() (size int) {
size = 0
var tmpFolder = sb.Stream.Folder + string(os.PathSeparator)
var tmpFolder = sb.Stream.Folder
if _, err := sb.FileSystem.Stat(tmpFolder); !fsIsNotExistErr(err) {

files, err := sb.FileSystem.ReadDir(getPlatformPath(tmpFolder))
files, err := sb.FileSystem.ReadDir(tmpFolder)
if err != nil {
ShowError(err, 000)
return
}
for _, file := range files {
if !file.IsDir() && filepath.Ext(file.Name()) == ".ts" {
file_info, err := sb.FileSystem.Stat(getPlatformFile(tmpFolder + file.Name()))
file_info, err := sb.FileSystem.Stat(filepath.Join(tmpFolder, file.Name()))
if err == nil {
size += int(file_info.Size())
}
Expand Down Expand Up @@ -223,7 +302,6 @@ func (sb *StreamBuffer) addBufferedFilesToPipe() {
err := sb.writeToPipe(f) // Add file so it will be copied to the pipes
if err != nil {
sb.Stream.ReportError(err, 0, "", false)

}
sb.DeleteOldestSegment()
}
Expand Down Expand Up @@ -262,14 +340,35 @@ func (sb *StreamBuffer) CheckBufferedFile(file string) (bool, error) {

func (sb *StreamBuffer) writeToPipe(file string) error {
f, err := sb.FileSystem.Open(filepath.Join(sb.Stream.Folder, file))
if err != nil {
return err
}
_, err = io.Copy(sb.PipeWriter, f)
if err != nil {
f.Close()
sb.Stream.ReportError(err, 0, "", true)
if err != nil {
return err
}
defer f.Close()


buf := make([]byte, 4096) // 4KB buffer
for {
select {
case <- sb.StopChan:
// Pipe was closed quit writing to it
return nil
default:
n, err := f.Read(buf)
if err != nil && err != io.EOF {
sb.Stream.ReportError(err, 0, "", true) // TODO: Add error code
return err
} else if err == io.EOF {
return nil
}
if n == 0 {
break
}

_, err = sb.PipeWriter.Write(buf[:n])
if err != nil {
sb.Stream.ReportError(err, 0, "", true) // TODO: Add error code
return err
}
}
}
f.Close()
return nil
}
Loading