Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait function for buffer #78

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added resources/images/stream-loading.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added resources/video/stream-loading.ts
Binary file not shown.
63 changes: 35 additions & 28 deletions src/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type StreamBuffer struct {
FileSystem avfs.VFS
Stream *Stream // Reference to the parents struct
RealData bool
StopChan chan struct{}
Stopped bool
CloseChan chan struct{}
Expand Down Expand Up @@ -143,6 +144,7 @@ func (sb *StreamBuffer) HandleByteOutput(stdOut io.ReadCloser) {
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Close the current file and create a new one
f.Close()
sb.RealData = true
sb.LatestSegment = tmpSegment
f, err = bufferVFS.Create(tmpFile)
if err != nil {
Expand Down Expand Up @@ -292,27 +294,41 @@ func (sb *StreamBuffer) GetBufferedSize() (size int) {
}

func (sb *StreamBuffer) addBufferedFilesToPipe() {
var waitContent []byte
var contentOk = false
if value, ok := webUI["web/public/video/stream-loading.ts"]; ok {
contentOk = true
waitContent = GetHTMLString(value.(string))
}

for {
select {
case <-sb.StopChan:
return
default:
if sb.GetBufferedSize() < Settings.BufferSize * 1024 {
time.Sleep(25 * time.Millisecond) // Wait for new files
continue
}
tmpFiles := sb.GetBufTmpFiles()
for _, f := range tmpFiles {
if ok, err := sb.CheckBufferFolder(); !ok {
sb.Stream.ReportError(err, BufferFolderError, "", true)
return
if !sb.RealData {
if contentOk {
sb.writeBytesToPipe(waitContent)
time.Sleep(800 * time.Millisecond)
}
ShowDebug(fmt.Sprintf("Streaming:Broadcasting file %s to clients", f), 1)
err := sb.writeToPipe(f) // Add file so it will be copied to the pipes
if err != nil {
sb.Stream.ReportError(err, 0, "", false)
} else {
if sb.GetBufferedSize() < Settings.BufferSize * 1024 {
time.Sleep(25 * time.Millisecond) // Wait for new files
continue
}
tmpFiles := sb.GetBufTmpFiles()
for _, f := range tmpFiles {
if ok, err := sb.CheckBufferFolder(); !ok {
sb.Stream.ReportError(err, BufferFolderError, "", true)
return
}
ShowDebug(fmt.Sprintf("Streaming:Broadcasting file %s to clients", f), 1)
err := sb.writeToPipe(f) // Add file so it will be copied to the pipes
if err != nil {
sb.Stream.ReportError(err, 0, "", false)
}
sb.DeleteOldestSegment()
}
sb.DeleteOldestSegment()
}
}
}
Expand Down Expand Up @@ -382,19 +398,10 @@ func (sb *StreamBuffer) writeToPipe(file string) error {
}
}

func (sb *StreamBuffer) writeBytesToPipe(data []byte) error {
for {
select {
case <- sb.StopChan:
// Pipe was closed quit writing to it
return nil
default:
_, err := sb.PipeWriter.Write(data)
if err != nil {
sb.Stream.ReportError(err, 0, "", true) // TODO: Add error code
return err
}
time.Sleep(800 * time.Millisecond)
}
func (sb *StreamBuffer) writeBytesToPipe(data []byte) (err error) {
_, err = sb.PipeWriter.Write(data)
if err != nil {
sb.Stream.ReportError(err, 0, "", true) // TODO: Add error code
}
return
}
11 changes: 10 additions & 1 deletion src/streamManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,19 @@ func HandleStreamLimit(stream *Stream) {
ShowInfo("Streaming Status:No new connections available. Tuner limit reached.")
ShowInfo("Streaming limit reached content instead")
content, contentOk := GetStreamLimitContent()
var stopChannel = stream.Buffer.GetStopChan()
if contentOk {
// Write content to the pipe in a loop
go func() {
stream.Buffer.writeBytesToPipe(content)
for {
select {
case <- stopChannel:
return
default:
stream.Buffer.writeBytesToPipe(content)
time.Sleep(750 * time.Millisecond)
}
}
}()
}
}
Expand Down
1 change: 1 addition & 0 deletions src/webUI.go

Large diffs are not rendered by default.

Binary file added web/public/video/stream-loading.ts
Binary file not shown.