diff --git a/internal/activities/video.go b/internal/activities/video.go index 64e16bc1..888a81cc 100644 --- a/internal/activities/video.go +++ b/internal/activities/video.go @@ -287,14 +287,14 @@ func DownloadTwitchVideo(ctx context.Context, input dto.ArchiveVideoInput) error } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -310,17 +310,20 @@ func DownloadTwitchVideo(ctx context.Context, input dto.ArchiveVideoInput) error if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoDownload(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } - + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoDownload(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } @@ -332,14 +335,14 @@ func DownloadTwitchLiveVideo(ctx context.Context, input dto.ArchiveVideoInput, c } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -355,26 +358,32 @@ func DownloadTwitchLiveVideo(ctx context.Context, input dto.ArchiveVideoInput, c if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoDownload(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoDownload(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } // Update video duration with duration from downloaded video duration, err := exec.GetVideoDuration(fmt.Sprintf("/tmp/%s_%s-video.mp4", input.Vod.ExtID, input.Vod.ID)) if err != nil { + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Vod.UpdateOneID(input.Vod.ID).SetDuration(duration).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } @@ -386,14 +395,14 @@ func PostprocessVideo(ctx context.Context, input dto.ArchiveVideoInput) error { } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -409,8 +418,10 @@ func PostprocessVideo(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -420,17 +431,21 @@ func PostprocessVideo(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoConvert(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } @@ -442,14 +457,14 @@ func MoveVideo(ctx context.Context, input dto.ArchiveVideoInput) error { } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -467,13 +482,16 @@ func MoveVideo(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoMove(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } // Update video path to hls path _, dbErr = database.DB().Client.Vod.UpdateOneID(input.Vod.ID).SetVideoPath(fmt.Sprintf("/vods/%s/%s/%s-video_hls/%s-video.m3u8", input.Channel.Name, input.Vod.FolderName, input.Vod.FileName, input.Vod.ExtID)).Save(ctx) if dbErr != nil { + cancel() return dbErr } } else { @@ -484,8 +502,10 @@ func MoveVideo(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoMove(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } } @@ -499,9 +519,11 @@ func MoveVideo(ctx context.Context, input dto.ArchiveVideoInput) error { _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskVideoMove(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } @@ -513,14 +535,14 @@ func DownloadTwitchChat(ctx context.Context, input dto.ArchiveVideoInput) error } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -536,8 +558,10 @@ func DownloadTwitchChat(ctx context.Context, input dto.ArchiveVideoInput) error if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatDownload(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -549,16 +573,20 @@ func DownloadTwitchChat(ctx context.Context, input dto.ArchiveVideoInput) error if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatDownload(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatDownload(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } @@ -570,14 +598,14 @@ func DownloadTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) er } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -593,8 +621,10 @@ func DownloadTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) er if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatDownload(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -606,16 +636,19 @@ func DownloadTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) er if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatDownload(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatDownload(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } - + cancel() return nil } @@ -627,14 +660,14 @@ func RenderTwitchChat(ctx context.Context, input dto.ArchiveVideoInput) error { } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -650,16 +683,21 @@ func RenderTwitchChat(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatRender(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatRender(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() + return nil } @@ -671,14 +709,14 @@ func MoveChat(ctx context.Context, input dto.ArchiveVideoInput) error { } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -696,8 +734,10 @@ func MoveChat(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatMove(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -709,17 +749,21 @@ func MoveChat(ctx context.Context, input dto.ArchiveVideoInput) error { if err != nil { _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatMove(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatMove(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } @@ -761,14 +805,14 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err } // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -786,14 +830,16 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err // Set queue chat task to complete _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Success).SetTaskChatRender(utils.Success).SetTaskChatMove((utils.Success)).Save(ctx) if dbErr != nil { + cancel() return dbErr } // Set VOD chat to empty _, dbErr = database.DB().Client.Vod.UpdateOneID(input.Vod.ID).SetChatVideoPath("").SetChatPath("").Save(ctx) if dbErr != nil { + cancel() return dbErr } - // Check if all task are done + cancel() return nil } @@ -803,8 +849,10 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err log.Error().Err(err).Msg("error getting streamer from Twitch API") _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } cID, err := strconv.Atoi(streamer.ID) @@ -812,14 +860,17 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err log.Error().Err(err).Msg("error converting streamer ID to int") _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } // update queue item updatedQueue, dbErr := database.DB().Client.Queue.Get(ctx, input.Queue.ID) if dbErr != nil { + cancel() return dbErr } input.Queue = updatedQueue @@ -827,6 +878,7 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err // TwitchDownloader requires the ID of the video, or at least a previous video ID videos, err := twitch.GetVideosByUser(streamer.ID, "archive") if err != nil { + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -839,6 +891,7 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err } // If no previous video ID was found, use the current video ID if previousVideoID == "" { + cancel() log.Warn().Msgf("Unable to convert chat due to no previous video IDs") // TODO: exit gracefully } @@ -848,8 +901,10 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err log.Error().Err(err).Msg("error converting chat") _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -860,8 +915,10 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err log.Error().Err(err).Msg("error updating chat") _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -874,29 +931,33 @@ func ConvertTwitchLiveChat(ctx context.Context, input dto.ArchiveVideoInput) err log.Error().Err(err).Msg("error copying chat convert") _, dbErr := database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Failed).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } _, dbErr = database.DB().Client.Queue.UpdateOneID(input.Queue.ID).SetTaskChatConvert(utils.Success).Save(ctx) if dbErr != nil { + cancel() return dbErr } + cancel() return nil } func TwitchSaveVideoChapters(ctx context.Context) error { // Create a new context with cancel - ctx, cancel := context.WithCancel(ctx) + heartbeatCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure to cancel when download is complete // Start a goroutine that sends a heartbeat every 30 seconds go func() { for { select { - case <-ctx.Done(): + case <-heartbeatCtx.Done(): // If the context is done, stop the goroutine return default: @@ -910,6 +971,7 @@ func TwitchSaveVideoChapters(ctx context.Context) error { // get all videos videos, err := database.DB().Client.Vod.Query().All(ctx) if err != nil { + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } @@ -954,6 +1016,7 @@ func TwitchSaveVideoChapters(ctx context.Context) error { for _, c := range chapters { _, err := chapterService.CreateChapter(c, video.ID) if err != nil { + cancel() return temporal.NewApplicationError(err.Error(), "", nil) } } @@ -962,6 +1025,6 @@ func TwitchSaveVideoChapters(ctx context.Context) error { // sleep for 0.25 seconds to not hit rate limit time.Sleep(250 * time.Millisecond) } - + cancel() return nil }