From 0772c40caba1e44b8b7bf8ba38b03357b68e3b5e Mon Sep 17 00:00:00 2001 From: Marcel Goerentz Date: Tue, 19 Nov 2024 22:25:22 +0100 Subject: [PATCH] Add doku and fix issue in the API --- src/serveStream.go | 82 ++++++++++++++++++++++++++++------------------ threadfin.go | 2 +- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/src/serveStream.go b/src/serveStream.go index 28f5e5e..30e1a4e 100644 --- a/src/serveStream.go +++ b/src/serveStream.go @@ -10,7 +10,6 @@ import ( "sort" "strconv" "strings" - "sync" "syscall" "time" @@ -52,8 +51,7 @@ func NewStreamManager() *StreamManager { } /* -StartStream starts the ffmpeg process for buffering a stream - + StartStream starts the ffmpeg process for buffering a stream It will check if the stream already exists */ func (sm *StreamManager) StartStream(streamInfo StreamInfo, w http.ResponseWriter) (clientID string, playlistID string) { @@ -263,31 +261,36 @@ func (sm *StreamManager) StopStream(playlistID string, streamID string, clientID sm.mu.Lock() defer sm.mu.Unlock() - stream, exists := sm.playlists[playlistID].streams[streamID] - if exists { - //client := stream.clients[clientID] - //closeClientConnection(client.w) - delete(stream.clients, clientID) - showInfo(fmt.Sprintf("Streaming:Client left %s, total: %d", streamID, len(stream.clients))) - if len(stream.clients) == 0 { - stream.cmd.Process.Signal(syscall.SIGKILL) - stream.cmd.Wait() - deletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid)) - delete(sm.playlists[playlistID].streams, streamID) - showInfo(fmt.Sprintf("Streaming:Stopped streaming for %s", streamID)) - var debug = fmt.Sprintf("Streaming Status:Remove temporary files (%s)", stream.Folder) - showDebug(debug, 1) - - debug = fmt.Sprintf("Remove tmp folder:%s", stream.Folder) - showDebug(debug, 1) - - if err := bufferVFS.RemoveAll(stream.Folder); err != nil { - ShowError(err, 4005) + playlist, exists := sm.playlists[playlistID] + if exists{ + stream, exists := playlist.streams[streamID] + if exists { + delete(stream.clients, clientID) + showInfo(fmt.Sprintf("Streaming:Client left %s, total: %d", streamID, len(stream.clients))) + if len(stream.clients) == 0 { + stream.cmd.Process.Signal(syscall.SIGKILL) + stream.cmd.Wait() + deletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid)) + delete(sm.playlists[playlistID].streams, streamID) + showInfo(fmt.Sprintf("Streaming:Stopped streaming for %s", streamID)) + var debug = fmt.Sprintf("Streaming Status:Remove temporary files (%s)", stream.Folder) + showDebug(debug, 1) + + debug = fmt.Sprintf("Remove tmp folder:%s", stream.Folder) + showDebug(debug, 1) + + if err := bufferVFS.RemoveAll(stream.Folder); err != nil { + ShowError(err, 4005) + } } } + if len(sm.playlists[playlistID].streams) == 0 { + delete(sm.playlists, playlistID) + } } } +/* func closeClientConnection(w http.ResponseWriter) { var once sync.Once // Set the header @@ -305,18 +308,15 @@ func closeClientConnection(w http.ResponseWriter) { conn.Close() } } -} +} */ func (sm *StreamManager) stopStreamForAllClients(streamID string) { sm.mu.Lock() defer sm.mu.Unlock() - for _, playlist := range sm.playlists { + for playlistID, playlist := range sm.playlists { stream, exists := playlist.streams[streamID] if exists { stream.cancel() // Cancel the context to stop all clients - //for _, client := range stream.clients { - // closeClientConnection(client.w) - //} stream.cmd.Process.Signal(syscall.SIGKILL) stream.cmd.Wait() deletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid)) @@ -330,6 +330,9 @@ func (sm *StreamManager) stopStreamForAllClients(streamID string) { ShowError(err, 4005) } } + if len(playlist.streams) == 0 { + delete(sm.playlists, playlistID) + } } } @@ -352,13 +355,16 @@ func (sm *StreamManager) ServeStream(streamInfo StreamInfo, w http.ResponseWrite // If it was the first client start t if len(stream.clients) == 1 { // Send Data to the clients, this should run only once per stream - go serveStream(stream, r, sm.errorChan) + go serveStream(stream, sm.errorChan) } // Wait for the stream to close the context <-stream.ctx.Done() } +/* + This function retrieves the playlist ID and the stream ID from the given stream +*/ func (sm *StreamManager) getPlaylistIDandStreamID(stream *Stream) (string, string){ for playlistID, playlist := range sm.playlists { for streamID, tmpStream := range playlist.streams { @@ -371,7 +377,7 @@ func (sm *StreamManager) getPlaylistIDandStreamID(stream *Stream) (string, strin return "", "" } -func serveStream(stream *Stream, r *http.Request, errorChan chan ErrorInfo) { +func serveStream(stream *Stream, errorChan chan ErrorInfo) { var oldSegments []string for { @@ -396,6 +402,10 @@ func serveStream(stream *Stream, r *http.Request, errorChan chan ErrorInfo) { } } +/* + This function retrieves the files within the buffer folder + and returns a sorted list with the file names +*/ func getBufTmpFiles(stream *Stream) (tmpFiles []string) { var tmpFolder = stream.Folder @@ -450,6 +460,9 @@ func deleteOldestSegment(stream *Stream, oldSegment string) { } } +/* + Check if the buffer folder exists +*/ func checkBufferFolder(stream *Stream) bool { if _, err := bufferVFS.Stat(stream.Folder); fsIsNotExistErr(err) { return false @@ -457,6 +470,10 @@ func checkBufferFolder(stream *Stream) bool { return true } + +/* + This functions sends the buffered files to the clients +*/ func sendFileToClients(stream *Stream, fileName string, errorChan chan ErrorInfo) bool { file, err := bufferVFS.Open(stream.Folder + fileName) if err != nil { @@ -480,14 +497,15 @@ func sendFileToClients(stream *Stream, fileName string, errorChan chan ErrorInfo for clientID, client := range stream.clients { showDebug(fmt.Sprintf("Sending file to client %s", fileName), 3) if _, err := client.w.Write(buffer); err != nil { - //showInfo("DEBUG: Write to client is not working!") - //ShowError(err, 0) // TODO: Add error code! errorChan <- ErrorInfo{SendFileError, stream, clientID} } } return true } +/* + This function is getting used by the API call for retrieving the currently used channels +*/ func getCurrentlyUsedChannels(sm *StreamManager, response *APIResponseStruct) error { // should be nil but its always better to check if response.ActiveStreams == nil { diff --git a/threadfin.go b/threadfin.go index 862cbd9..ab80f08 100644 --- a/threadfin.go +++ b/threadfin.go @@ -42,7 +42,7 @@ var GitHub = GitHubStruct{Branch: "Main", User: "marcelGoerentz", Repo: "Threadf const Name = "Threadfin" // Version : Version, die Build Nummer wird in der main func geparst. -const Version = "1.7.2-beta" +const Version = "1.7.3-beta" // DBVersion : Datanbank Version const DBVersion = "0.5.0"