Skip to content

Commit

Permalink
Add doku and fix issue in the API
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelGoerentz committed Nov 19, 2024
1 parent 36296d8 commit 0772c40
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 33 deletions.
82 changes: 50 additions & 32 deletions src/serveStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -52,8 +51,7 @@ func NewStreamManager() *StreamManager {
}

/*
StartStream starts the ffmpeg process for buffering a stream
StartStream starts the ffmpeg process for buffering a stream
It will check if the stream already exists
*/
func (sm *StreamManager) StartStream(streamInfo StreamInfo, w http.ResponseWriter) (clientID string, playlistID string) {
Expand Down Expand Up @@ -263,31 +261,36 @@ func (sm *StreamManager) StopStream(playlistID string, streamID string, clientID
sm.mu.Lock()
defer sm.mu.Unlock()

stream, exists := sm.playlists[playlistID].streams[streamID]
if exists {
//client := stream.clients[clientID]
//closeClientConnection(client.w)
delete(stream.clients, clientID)
showInfo(fmt.Sprintf("Streaming:Client left %s, total: %d", streamID, len(stream.clients)))
if len(stream.clients) == 0 {
stream.cmd.Process.Signal(syscall.SIGKILL)
stream.cmd.Wait()
deletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid))
delete(sm.playlists[playlistID].streams, streamID)
showInfo(fmt.Sprintf("Streaming:Stopped streaming for %s", streamID))
var debug = fmt.Sprintf("Streaming Status:Remove temporary files (%s)", stream.Folder)
showDebug(debug, 1)

debug = fmt.Sprintf("Remove tmp folder:%s", stream.Folder)
showDebug(debug, 1)

if err := bufferVFS.RemoveAll(stream.Folder); err != nil {
ShowError(err, 4005)
playlist, exists := sm.playlists[playlistID]
if exists{
stream, exists := playlist.streams[streamID]
if exists {
delete(stream.clients, clientID)
showInfo(fmt.Sprintf("Streaming:Client left %s, total: %d", streamID, len(stream.clients)))
if len(stream.clients) == 0 {
stream.cmd.Process.Signal(syscall.SIGKILL)
stream.cmd.Wait()
deletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid))
delete(sm.playlists[playlistID].streams, streamID)
showInfo(fmt.Sprintf("Streaming:Stopped streaming for %s", streamID))
var debug = fmt.Sprintf("Streaming Status:Remove temporary files (%s)", stream.Folder)
showDebug(debug, 1)

debug = fmt.Sprintf("Remove tmp folder:%s", stream.Folder)
showDebug(debug, 1)

if err := bufferVFS.RemoveAll(stream.Folder); err != nil {
ShowError(err, 4005)
}
}
}
if len(sm.playlists[playlistID].streams) == 0 {
delete(sm.playlists, playlistID)
}
}
}

/*
func closeClientConnection(w http.ResponseWriter) {
var once sync.Once
// Set the header
Expand All @@ -305,18 +308,15 @@ func closeClientConnection(w http.ResponseWriter) {
conn.Close()
}
}
}
} */

func (sm *StreamManager) stopStreamForAllClients(streamID string) {
sm.mu.Lock()
defer sm.mu.Unlock()
for _, playlist := range sm.playlists {
for playlistID, playlist := range sm.playlists {
stream, exists := playlist.streams[streamID]
if exists {
stream.cancel() // Cancel the context to stop all clients
//for _, client := range stream.clients {
// closeClientConnection(client.w)
//}
stream.cmd.Process.Signal(syscall.SIGKILL)
stream.cmd.Wait()
deletPIDfromDisc(fmt.Sprintf("%d", stream.cmd.Process.Pid))
Expand All @@ -330,6 +330,9 @@ func (sm *StreamManager) stopStreamForAllClients(streamID string) {
ShowError(err, 4005)
}
}
if len(playlist.streams) == 0 {
delete(sm.playlists, playlistID)
}
}
}

Expand All @@ -352,13 +355,16 @@ func (sm *StreamManager) ServeStream(streamInfo StreamInfo, w http.ResponseWrite
// If it was the first client start t
if len(stream.clients) == 1 {
// Send Data to the clients, this should run only once per stream
go serveStream(stream, r, sm.errorChan)
go serveStream(stream, sm.errorChan)
}

// Wait for the stream to close the context
<-stream.ctx.Done()
}

/*
This function retrieves the playlist ID and the stream ID from the given stream
*/
func (sm *StreamManager) getPlaylistIDandStreamID(stream *Stream) (string, string){
for playlistID, playlist := range sm.playlists {
for streamID, tmpStream := range playlist.streams {
Expand All @@ -371,7 +377,7 @@ func (sm *StreamManager) getPlaylistIDandStreamID(stream *Stream) (string, strin
return "", ""
}

func serveStream(stream *Stream, r *http.Request, errorChan chan ErrorInfo) {
func serveStream(stream *Stream, errorChan chan ErrorInfo) {
var oldSegments []string

for {
Expand All @@ -396,6 +402,10 @@ func serveStream(stream *Stream, r *http.Request, errorChan chan ErrorInfo) {
}
}

/*
This function retrieves the files within the buffer folder
and returns a sorted list with the file names
*/
func getBufTmpFiles(stream *Stream) (tmpFiles []string) {

var tmpFolder = stream.Folder
Expand Down Expand Up @@ -450,13 +460,20 @@ func deleteOldestSegment(stream *Stream, oldSegment string) {
}
}

/*
Check if the buffer folder exists
*/
func checkBufferFolder(stream *Stream) bool {
if _, err := bufferVFS.Stat(stream.Folder); fsIsNotExistErr(err) {
return false
}
return true
}


/*
This functions sends the buffered files to the clients
*/
func sendFileToClients(stream *Stream, fileName string, errorChan chan ErrorInfo) bool {
file, err := bufferVFS.Open(stream.Folder + fileName)
if err != nil {
Expand All @@ -480,14 +497,15 @@ func sendFileToClients(stream *Stream, fileName string, errorChan chan ErrorInfo
for clientID, client := range stream.clients {
showDebug(fmt.Sprintf("Sending file to client %s", fileName), 3)
if _, err := client.w.Write(buffer); err != nil {
//showInfo("DEBUG: Write to client is not working!")
//ShowError(err, 0) // TODO: Add error code!
errorChan <- ErrorInfo{SendFileError, stream, clientID}
}
}
return true
}

/*
This function is getting used by the API call for retrieving the currently used channels
*/
func getCurrentlyUsedChannels(sm *StreamManager, response *APIResponseStruct) error {
// should be nil but its always better to check
if response.ActiveStreams == nil {
Expand Down
2 changes: 1 addition & 1 deletion threadfin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var GitHub = GitHubStruct{Branch: "Main", User: "marcelGoerentz", Repo: "Threadf
const Name = "Threadfin"

// Version : Version, die Build Nummer wird in der main func geparst.
const Version = "1.7.2-beta"
const Version = "1.7.3-beta"

// DBVersion : Datanbank Version
const DBVersion = "0.5.0"
Expand Down

0 comments on commit 0772c40

Please sign in to comment.