Skip to content

Commit

Permalink
Merge pull request #73 from marcelGoerentz/add_termination_timeout
Browse files Browse the repository at this point in the history
Add termination timeout
  • Loading branch information
marcelGoerentz authored Jan 22, 2025
2 parents d20e67f + 1edf38a commit be4e539
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 217 deletions.
5 changes: 5 additions & 0 deletions src/buffer-interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "io"

type BufferInterface interface {
StartBuffer(stream *Stream) error
StopBuffer()
CloseBuffer()
HandleByteOutput(stdOut io.ReadCloser)
PrepareBufferFolder(folder string) error
GetBufTmpFiles() []string
Expand All @@ -13,4 +15,7 @@ type BufferInterface interface {
CheckBufferFolder() (bool, error)
CheckBufferedFile(file string) (bool, error)
writeToPipe(file string) error
GetPipeReader() *io.PipeReader
GetStopChan() chan struct{}
SetStopChan(chan struct{})
}
223 changes: 161 additions & 62 deletions src/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type StreamBuffer struct {
IsThirdPartyBuffer bool
Stream *Stream // Reference to the parents struct
StopChan chan struct{}
CloseChan chan struct{}
LatestSegment int
OldSegments []string
PipeWriter *io.PipeWriter
Expand All @@ -40,14 +41,36 @@ const (

func (sb *StreamBuffer) StartBuffer(stream *Stream) error {
sb.Stream = stream
if err := sb.PrepareBufferFolder(stream.Folder + string(os.PathSeparator)); err != nil {
if err := sb.PrepareBufferFolder(filepath.Join(stream.Folder, "0.ts")); err != nil {
// If something went wrong when setting up the buffer storage don't run at all
stream.ReportError(err, BufferFolderError, "", true)
return err
}
return nil
}

func (sb *StreamBuffer) StopBuffer() {
close(sb.StopChan)
}

func (sb *StreamBuffer) CloseBuffer() {
sb.StopBuffer()
close(sb.CloseChan)
sb.RemoveBufferedFiles(filepath.Join(sb.Stream.Folder, "0.ts"))
}

func (sb *StreamBuffer) GetPipeReader() *io.PipeReader{
return sb.PipeReader
}

func (sb *StreamBuffer) GetStopChan() chan struct{} {
return sb.StopChan
}

func (sb *StreamBuffer) SetStopChan(stopChan chan struct{}) {
sb.StopChan = stopChan
}

/*
HandleByteOutput save the byte ouptut of the command or http request as files
*/
Expand All @@ -66,68 +89,125 @@ func (sb *StreamBuffer) HandleByteOutput(stdOut io.ReadCloser) {
var tmpFile string
reader := bufio.NewReader(stdOut)
for {
if init {
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
f, err = bufferVFS.Create(tmpFile)
if err != nil {
select {
case <- sb.CloseChan:
// If the stream got stopped, stop the output
return
default:
if init {
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
ShowError(err, CreateFileError)
sb.Stream.ReportError(err, CreateFileError, "", true)
return
}
init = false
}
n, err := reader.Read(buffer)
if n == 0 && err == nil {
continue
}
if err == io.EOF {
f.Close()
ShowError(err, CreateFileError)
sb.Stream.ReportError(err, CreateFileError, "", true)
ShowDebug("Buffer reached EOF!", 3)
sb.Stream.ReportError(err, EndOfFileError, "", true)
return
}
init = false
}
n, err := reader.Read(buffer)
if n == 0 && err == nil {
continue
}
if err == io.EOF {
f.Close()
ShowDebug("Buffer reached EOF!", 3)
sb.Stream.ReportError(err, EndOfFileError, "", true)
return
}
if err != nil {
f.Close()
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, ReadIntoBufferError, "", true)
return
}
if _, err := f.Write(buffer[:n]); err != nil {
f.Close()
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, WriteToBufferError, "", true)
return
}
fileSize += n
// Check if the file size exceeds the threshold
if fileSize >= TS_PACKAGE_MIN_SIZE*1024 {
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Close the current file and create a new one
f.Close()
sb.LatestSegment = tmpSegment
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
sb.Stream.ReportError(err, CreateFileError, "", true)
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, ReadIntoBufferError, "", true)
return
}
fileSize = 0
if _, err := f.Write(buffer[:n]); err != nil {
f.Close()
bufferVFS.Remove(tmpFile)
sb.Stream.ReportError(err, WriteToBufferError, "", true)
return
}
fileSize += n
// Check if the file size exceeds the threshold
if fileSize >= TS_PACKAGE_MIN_SIZE*1024 {
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Close the current file and create a new one
f.Close()
sb.LatestSegment = tmpSegment
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
sb.Stream.ReportError(err, CreateFileError, "", true)
return
}
fileSize = 0
}
}
}
}

func (sb *StreamBuffer) RemoveBufferedFiles(folder string) error {
test := filepath.Dir(folder)
if err := sb.FileSystem.RemoveAll(test); err != nil {
return fmt.Errorf("failed to remove buffer folder: %w", err)
}
return nil
}

/*
PrepareBufferFolder will clean the buffer folder and check if the folder exists
*/
func (sb *StreamBuffer) PrepareBufferFolder(folder string) error {
if err := sb.FileSystem.RemoveAll(getPlatformPath(folder)); err != nil {
return fmt.Errorf("failed to remove buffer folder: %w", err)
if err := sb.RemoveBufferedFiles(folder); err != nil {
return err
}

if err := sb.createBufferFolder(filepath.Dir(folder)); err != nil {
return fmt.Errorf("failed to create buffer folder: %w", err)
}

if err := checkVFSFolder(folder, sb.FileSystem); err != nil {
return fmt.Errorf("failed to check buffer folder: %w", err)
return nil
}

func (sb *StreamBuffer) createBufferFolder(path string) (err error) {

var debug string
_, err = sb.FileSystem.Stat(path)

if fsIsNotExistErr(err) {
// Folder does not exist, will now be created

// If we are on Windows and the cache location path is NOT on C:\ we need to create the volume it is located on
// Failure to do so here will result in a panic error and the stream not playing
if Settings.StoreBufferInRAM {
if sb.FileSystem.OSType() == avfs.OsWindows {
vm := sb.FileSystem.(avfs.VolumeManager)
pathIterator := avfs.NewPathIterator(sb.FileSystem, path)
if pathIterator.VolumeName() != "C:" {
vm.VolumeAdd(path)
}
}

err = sb.FileSystem.MkdirAll(path, 0755)
if err == nil {
debug = fmt.Sprintf("Create virtual filesystem Folder: %s", path)
ShowDebug(debug, 1)
} else {
return err
}

} else {
err = sb.FileSystem.MkdirAll(path, 0755)
if err == nil {
debug = fmt.Sprintf("Created folder on disk: %s", path)
ShowDebug(debug, 1)
} else {
return err
}
}

return nil
}

return nil
Expand All @@ -139,12 +219,11 @@ and returns a sorted list with the file names
*/
func (sb *StreamBuffer) GetBufTmpFiles() (tmpFiles []string) {

var tmpFolder = sb.Stream.Folder + string(os.PathSeparator)
var tmpFolder = sb.Stream.Folder
var fileIDs []float64

if _, err := sb.FileSystem.Stat(tmpFolder); !fsIsNotExistErr(err) {

files, err := sb.FileSystem.ReadDir(getPlatformPath(tmpFolder))
files, err := sb.FileSystem.ReadDir(tmpFolder)
if err != nil {
ShowError(err, 000)
return
Expand Down Expand Up @@ -183,17 +262,17 @@ func (sb *StreamBuffer) GetBufTmpFiles() (tmpFiles []string) {

func (sb *StreamBuffer) GetBufferedSize() (size int) {
size = 0
var tmpFolder = sb.Stream.Folder + string(os.PathSeparator)
var tmpFolder = sb.Stream.Folder
if _, err := sb.FileSystem.Stat(tmpFolder); !fsIsNotExistErr(err) {

files, err := sb.FileSystem.ReadDir(getPlatformPath(tmpFolder))
files, err := sb.FileSystem.ReadDir(tmpFolder)
if err != nil {
ShowError(err, 000)
return
}
for _, file := range files {
if !file.IsDir() && filepath.Ext(file.Name()) == ".ts" {
file_info, err := sb.FileSystem.Stat(getPlatformFile(tmpFolder + file.Name()))
file_info, err := sb.FileSystem.Stat(filepath.Join(tmpFolder, file.Name()))
if err == nil {
size += int(file_info.Size())
}
Expand Down Expand Up @@ -223,7 +302,6 @@ func (sb *StreamBuffer) addBufferedFilesToPipe() {
err := sb.writeToPipe(f) // Add file so it will be copied to the pipes
if err != nil {
sb.Stream.ReportError(err, 0, "", false)

}
sb.DeleteOldestSegment()
}
Expand Down Expand Up @@ -262,14 +340,35 @@ func (sb *StreamBuffer) CheckBufferedFile(file string) (bool, error) {

func (sb *StreamBuffer) writeToPipe(file string) error {
f, err := sb.FileSystem.Open(filepath.Join(sb.Stream.Folder, file))
if err != nil {
return err
}
_, err = io.Copy(sb.PipeWriter, f)
if err != nil {
f.Close()
sb.Stream.ReportError(err, 0, "", true)
if err != nil {
return err
}
defer f.Close()


buf := make([]byte, 4096) // 4KB buffer
for {
select {
case <- sb.StopChan:
// Pipe was closed quit writing to it
return nil
default:
n, err := f.Read(buf)
if err != nil && err != io.EOF {
sb.Stream.ReportError(err, 0, "", true) // TODO: Add error code
return err
} else if err == io.EOF {
return nil
}
if n == 0 {
break
}

_, err = sb.PipeWriter.Write(buf[:n])
if err != nil {
sb.Stream.ReportError(err, 0, "", true) // TODO: Add error code
return err
}
}
}
f.Close()
return nil
}
Loading

0 comments on commit be4e539

Please sign in to comment.