Skip to content

Commit

Permalink
Not supporting VLC anymore! Add new / old Threadfin option again
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelGoerentz committed Nov 22, 2024
1 parent 4596167 commit 0184ef5
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 246 deletions.
4 changes: 2 additions & 2 deletions html/lang/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,10 @@
},
"streamBuffering": {
"title": "Stream Buffer",
"description": "Functions of the buffer:<br>- The stream is passed from FFmpeg or VLC to Plex, Emby, Jellyfin or M3U Player<br>- Small jerking of the streams can be compensated<br>- HLS / M3U8 support<br>- RTP / RTPS support<br>- Re-streaming<br>- Separate tuner limit for each playlist",
"description": "Functions of the buffer:<br>- The stream is passed from FFmpeg or Threadfin to Plex, Emby, Jellyfin or M3U Player<br>- Small jerking of the streams can be compensated<br>- HLS / M3U8 support<br>- RTP / RTPS support<br>- Re-streaming<br>- Separate tuner limit for each playlist",
"info_false": "No Buffer (Client connects directly to the streaming server)",
"info_ffmpeg": "FFmpeg connects to the streaming server",
"info_vlc": "VLC connects to the streaming server"
"info_threadfin": "Threadfin connects to the streaming server"
},
"udpxy": {
"title": "UDPxy address",
Expand Down
194 changes: 194 additions & 0 deletions src/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package src

import (
"bufio"
"fmt"
"io"
"net"
"strings"

"github.com/avfs/avfs"
"github.com/avfs/avfs/vfs/memfs"
"github.com/avfs/avfs/vfs/osfs"
)

/*
InitBufferVFS will set the bufferVFS variable
*/
func InitBufferVFS(virtual bool) {

if virtual {
bufferVFS = memfs.New()
} else {
bufferVFS = osfs.New()
}

}

/*
GetBufferConfig reutrns the the arguments from the buffer settings
*/
func GetBufferConfig() (bufferType, path, options string) {
bufferType = strings.ToUpper(Settings.Buffer)
switch bufferType {
case "FFMPEG":
return bufferType, Settings.FFmpegPath, Settings.FFmpegOptions
case "THREADFIN":
return bufferType, "", ""
default:
return "", "", ""
}
}

func StartBuffer(stream *Stream, useBackup bool, backupNumber int, errorChan chan ErrorInfo) *Buffer {
if useBackup {
UpdateStreamURLForBackup(stream, backupNumber)
}

bufferType, path, options := GetBufferConfig()
if bufferType == "" {
return nil
}

if err := PrepareBufferFolder(stream.Folder); err != nil {
ShowError(err, 4008)
HandleBufferError(err, backupNumber, useBackup, stream, errorChan)
return nil
}

showInfo(fmt.Sprintf("%s path:%s", bufferType, path))
showInfo("Streaming URL:" + stream.URL)

switch Settings.Buffer {
case "ffmpeg":
if buffer, err := RunBufferCommand(bufferType, path, options, stream, errorChan); err != nil {
return HandleBufferError(err, backupNumber, useBackup, stream, errorChan)
} else {
return buffer
}
case "threadfin":
if buffer, err := StartThreadfinBuffer(stream, useBackup, backupNumber, errorChan); err != nil {
return HandleBufferError(err, backupNumber, useBackup, stream, errorChan)
} else {
return buffer
}
default:
return nil
}
}

/*
HandleBufferError will retry running the Buffer function with the next backup number
*/
func HandleBufferError(err error, backupNumber int, useBackup bool, stream *Stream, errorChan chan ErrorInfo) *Buffer {
ShowError(err, 4011)
if !useBackup || (useBackup && backupNumber >= 0 && backupNumber <= 3) {
backupNumber++
if stream.BackupChannel1URL != "" || stream.BackupChannel2URL != "" || stream.BackupChannel3URL != "" {
return StartBuffer(stream, true, backupNumber, errorChan)
}
}
return nil
}

/*
HandleByteOutput save the byte ouptut of the command or http request as files
*/
func HandleByteOutput(stdOut io.ReadCloser, stream *Stream, errorChan chan ErrorInfo) {
bufferSize := Settings.BufferSize * 1024 // Puffergröße in Bytes
buffer := make([]byte, bufferSize)
var fileSize int
init := true
tmpFolder := stream.Folder
tmpSegment := 1

var f avfs.File
var err error
var tmpFile string
reader := bufio.NewReader(stdOut)
for {
if init {
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
ShowError(err, 4010)
errorChan <- ErrorInfo{CreateFileError, stream, ""}
return
}
init = false
}
n, err := reader.Read(buffer)
if err == io.EOF {
f.Close()
showDebug("Buffer reached EOF!", 3)
errorChan <- ErrorInfo{EndOfFileError, stream, ""}
return
}
if err != nil {
if _, ok := err.(*net.OpError); !ok || stream.Buffer.isThirdPartyBuffer {
ShowError(err, 4012)
}
f.Close()
errorChan <- ErrorInfo{ReadIntoBufferError, stream, ""}
return
}
if _, err := f.Write(buffer[:n]); err != nil {
ShowError(err, 4013)
f.Close()
errorChan <- ErrorInfo{WriteToBufferError, stream, ""}
return
}
fileSize += n
// Prüfen, ob Dateigröße den Puffer überschreitet
if fileSize >= bufferSize {
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Datei schließen und neue Datei öffnen
f.Close()
f, err = bufferVFS.Create(tmpFile)
if err != nil {
f.Close()
ShowError(err, 4010)
errorChan <- ErrorInfo{CreateFileError, stream, ""}
return
}
fileSize = 0
}
}
}

/*
UpdateStreamURLForBackup will set the ther stream url when a backup will be used
*/
func UpdateStreamURLForBackup(stream *Stream, backupNumber int) {
switch backupNumber {
case 1:
stream.URL = stream.BackupChannel1URL
showHighlight("START OF BACKUP 1 STREAM")
showInfo("Backup Channel 1 URL: " + stream.URL)
case 2:
stream.URL = stream.BackupChannel2URL
showHighlight("START OF BACKUP 2 STREAM")
showInfo("Backup Channel 2 URL: " + stream.URL)
case 3:
stream.URL = stream.BackupChannel3URL
showHighlight("START OF BACKUP 3 STREAM")
showInfo("Backup Channel 3 URL: " + stream.URL)
}
}

/*
PrepareBufferFolder will clean the buffer folder and check if the folder exists
*/
func PrepareBufferFolder(folder string) error {
if err := bufferVFS.RemoveAll(getPlatformPath(folder)); err != nil {
return fmt.Errorf("failed to remove buffer folder: %w", err)
}

if err := checkVFSFolder(folder, bufferVFS); err != nil {
return fmt.Errorf("failed to check buffer folder: %w", err)
}

return nil
}
2 changes: 2 additions & 0 deletions src/screen.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ func getErrMsg(errCode int) (errMsg string) {
errMsg = "Could not get file statics of buffered file"
case 4016:
errMsg = "Could not read buffered file before sending to clients"
case 4017:
errMsg = "Cannot stream from M3U file, please use ffmpeg"


// PID saving and deleting
Expand Down
52 changes: 30 additions & 22 deletions src/serveStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewStreamManager() *StreamManager {
if errorInfo.ClientID != "" {
// Client specifc errors
sm.StopStream(playlistID, streamID, errorInfo.ClientID)
/*} else {
/*} else {
if streamID != "" {
// Buffer errors
if errorInfo.ErrorCode != EndOfFileError {
Expand All @@ -48,8 +48,9 @@ func NewStreamManager() *StreamManager {
sm.StopStreamForAllClients(streamID)
}*/
} else {
for clientID,_ := range errorInfo.Stream.clients {
for clientID, client := range errorInfo.Stream.clients {
sm.StopStream(playlistID, streamID, clientID)
CloseClientConnection(client.w)
}
}
}
Expand Down Expand Up @@ -123,7 +124,7 @@ func CreateStream(streamInfo StreamInfo, errorChan chan ErrorInfo) *Stream {
folder := System.Folder.Temp + streamInfo.PlaylistID + string(os.PathSeparator) + streamInfo.URLid + string(os.PathSeparator)
stream := &Stream{
name: streamInfo.Name,
cmd: nil,
Buffer: nil,
ctx: ctx,
cancel: cancel,
URL: streamInfo.URL,
Expand All @@ -133,8 +134,8 @@ func CreateStream(streamInfo StreamInfo, errorChan chan ErrorInfo) *Stream {
Folder: folder,
clients: make(map[string]Client),
}
cmd := Buffer(stream, false, 0, errorChan)
stream.cmd = cmd
buffer := StartBuffer(stream, false, 0, errorChan)
stream.Buffer = buffer
return stream
}

Expand Down Expand Up @@ -275,12 +276,20 @@ It will use the third party tool defined in the settings and starts a process fo
*/
func CreateAlternativNoMoreStreamsVideo(pathToFile string) error {
cmd := new(exec.Cmd)
arguments := []string{"-loop", "1", "-i", pathToFile, "-c:v", "libx264", "-t", "1", "-pix_fmt", "yuv420p", "-vf", "scale=1920:1080", fmt.Sprintf("%sstream-limit.ts", System.Folder.Video)}
switch Settings.Buffer {
case "ffmpeg":
cmd = exec.Command(Settings.FFmpegPath, "-loop", "1", "-i", pathToFile, "-c:v", "libx264", "-t", "1", "-pix_fmt", "yuv420p", "-vf", "scale=1920:1080", fmt.Sprintf("%sstream-limit.ts", System.Folder.Video))
case "vlc":
cmd = exec.Command(Settings.VLCPath, "--no-audio", "--loop", "--sout", fmt.Sprintf("'#transcode{vcodec=h264,vb=1024,scale=1,width=1920,height=1080,acodec=none,venc=x264{preset=ultrafast}}:standard{access=file,mux=ts,dst=%sstream-limit.ts}'", System.Folder.Video), System.Folder.Video, pathToFile)

cmd = exec.Command(Settings.FFmpegPath, arguments...)
default:
if Settings.FFmpegPath != "" {
if _, err := os.Stat(Settings.FFmpegPath); err != nil {
return fmt.Errorf("ffmpeg path is not valid. Can not convert custom image to video")
} else {
cmd = exec.Command(Settings.FFmpegPath, arguments...)
}
} else {
return fmt.Errorf("no ffmpeg path given")
}
}
if len(cmd.Args) > 0 {
showInfo("Streaming Status:Creating video from uploaded image for a customized no more stream video")
Expand Down Expand Up @@ -310,10 +319,14 @@ func (sm *StreamManager) StopStream(playlistID string, streamID string, clientID
delete(stream.clients, clientID)
showInfo(fmt.Sprintf("Streaming:Client left %s, total: %d", streamID, len(stream.clients)))
if len(stream.clients) == 0 {
stream.cancel() // Tell everyone about the ending of the stream
stream.cmd.Process.Signal(syscall.SIGKILL) // Kill the third party tool process
stream.cmd.Wait()
DeletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid)) // Delet
stream.cancel() // Tell everyone about the ending of the stream
if stream.Buffer.isThirdPartyBuffer {
stream.Buffer.cmd.Process.Signal(syscall.SIGKILL) // Kill the third party tool process
stream.Buffer.cmd.Wait()
DeletPIDfromDisc(fmt.Sprintf("%d", stream.Buffer.cmd.Process.Pid)) // Delete the PID since the process has been terminated
} else {
close(stream.Buffer.stopChan)
}
delete(sm.playlists[playlistID].streams, streamID)
showInfo(fmt.Sprintf("Streaming:Stopped streaming for %s", streamID))
var debug = fmt.Sprintf("Streaming:Remove temporary files (%s)", stream.Folder)
Expand All @@ -333,14 +346,9 @@ func (sm *StreamManager) StopStream(playlistID string, streamID string, clientID
}
}

/*
func CloseClientConnection(w http.ResponseWriter) {
var once sync.Once
// Set the header
w.Header().Set("Connection", "close")
once.Do(func() {
w.WriteHeader(http.StatusNotFound) // Einmaliger Aufruf von WriteHeader
})
// Set the header
w.Header().Set("Connection", "close")
// Close the connection explicitly
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
Expand All @@ -351,7 +359,7 @@ func CloseClientConnection(w http.ResponseWriter) {
conn.Close()
}
}
} */
}

/*
StopStreamForAllClient stops the third paryt tool process and will delete all clients from the given stream
Expand Down Expand Up @@ -440,7 +448,7 @@ func SendData(stream *Stream, errorChan chan ErrorInfo) {
return
}
oldSegments = append(oldSegments, f)
showDebug(fmt.Sprintf("Streaming:Sending file %s to clients", f), 2)
showDebug(fmt.Sprintf("Streaming:Sending file %s to clients", f), 1)
if !SendFileToClients(stream, f, errorChan) {
return
}
Expand Down
16 changes: 11 additions & 5 deletions src/struct-buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type StreamManager struct {
playlists map[string]*Playlist
errorChan chan ErrorInfo
stopChan chan bool
stopChan chan bool
mu sync.Mutex
}

Expand All @@ -24,7 +24,7 @@ type Playlist struct {
type Stream struct {
name string
clients map[string]Client
cmd *exec.Cmd
Buffer *Buffer
ctx context.Context
cancel context.CancelFunc

Expand All @@ -36,15 +36,21 @@ type Stream struct {
BackupChannel3URL string
}

type Buffer struct {
isThirdPartyBuffer bool
cmd *exec.Cmd
stopChan chan struct{}
}

type Client struct {
w http.ResponseWriter
r *http.Request
}

type ErrorInfo struct {
ErrorCode int
Stream *Stream
ClientID string
ErrorCode int
Stream *Stream
ClientID string
}

const (
Expand Down
Loading

0 comments on commit 0184ef5

Please sign in to comment.