From 0a45cf4ebb4c4be229c68a6aafbd21cf386342e6 Mon Sep 17 00:00:00 2001
From: sayedppqq <sayed@appscode.com>
Date: Wed, 18 Dec 2024 19:12:51 +0600
Subject: [PATCH] wip

Signed-off-by: sayedppqq <sayed@appscode.com>
---
 cmd/mongo/oplog_push.go                    | 43 +++++++----
 internal/databases/mongo/archive/loader.go | 86 ++++++++++++++++++----
 2 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go
index b3df98ff4..3db23b0dd 100644
--- a/cmd/mongo/oplog_push.go
+++ b/cmd/mongo/oplog_push.go
@@ -27,9 +27,11 @@ import (
 )
 
 var (
-	snapshotName      string
-	snapshotNamespace string
-	kubeconfig        string
+	kubeconfig                        string
+	snapshotName                      string
+	snapshotNamespace                 string
+	snapshotSuccessfulLogHistoryLimit string
+	snapshotFailedLogHistoryLimit     string
 )
 
 // oplogPushCmd represents the continuous oplog archiving procedure
@@ -60,13 +62,19 @@ var oplogPushCmd = &cobra.Command{
 }
 
 func init() {
+	oplogPushCmd.PersistentFlags().StringVarP(
+		&kubeconfig, "kubeconfig", "", "", "Path of the kubeconfig")
 	cmd.AddCommand(oplogPushCmd)
 	oplogPushCmd.PersistentFlags().StringVarP(
 		&snapshotName, "snapshot-name", "", "", "Name of the snapshot")
 	oplogPushCmd.PersistentFlags().StringVarP(
 		&snapshotNamespace, "snapshot-namespace", "n", "", "Namespace of the snapshot")
 	oplogPushCmd.PersistentFlags().StringVarP(
-		&kubeconfig, "kubeconfig", "", "", "Path of the kubeconfig")
+		&snapshotSuccessfulLogHistoryLimit, "snapshot-successful-log-history-limit", "", "",
+		"Maximum number of successful log history in snapshot")
+	oplogPushCmd.PersistentFlags().StringVarP(
+		&snapshotFailedLogHistoryLimit, "snapshot-failed-log-history-limit", "", "",
+		"Maximum number of failed log history in snapshot")
 }
 
 func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplogPushStatsArgs) error {
@@ -83,8 +91,11 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
 	uplProvider.ChangeDirectory(subDir)
 	uploader := archive.NewStorageUploader(uplProvider)
 	uploader.SetKubeClient(pushArgs.kubeClient)
-	uploader.SetSnapshot(snapshotName, snapshotNamespace)
 	uploader.SetDBNode(pushArgs.dbNode)
+	err = uploader.SetupSnapshot(snapshotName, snapshotNamespace, snapshotSuccessfulLogHistoryLimit, snapshotFailedLogHistoryLimit)
+	if err != nil {
+		return err
+	}
 
 	// set up mongodb client and oplog fetcher
 	mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL)
@@ -149,16 +160,18 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
 }
 
 type oplogPushRunArgs struct {
-	archiveAfterSize   int
-	archiveTimeout     time.Duration
-	mongodbURL         string
-	dbNode             string
-	dbProvider         string
-	dbPath             string
-	primaryWait        bool
-	primaryWaitTimeout time.Duration
-	lwUpdate           time.Duration
-	kubeClient         controllerclient.Client
+	archiveAfterSize     int
+	archiveTimeout       time.Duration
+	mongodbURL           string
+	dbNode               string
+	dbProvider           string
+	dbPath               string
+	successfulLogHistory string
+	failedLogHistory     string
+	primaryWait          bool
+	primaryWaitTimeout   time.Duration
+	lwUpdate             time.Duration
+	kubeClient           controllerclient.Client
 }
 
 func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
diff --git a/internal/databases/mongo/archive/loader.go b/internal/databases/mongo/archive/loader.go
index 3f8096894..5ef5c0c68 100644
--- a/internal/databases/mongo/archive/loader.go
+++ b/internal/databases/mongo/archive/loader.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 	"sort"
+	"strconv"
 	"strings"
 	"time"
 
@@ -222,10 +223,13 @@ type StorageUploader struct {
 	crypter crypto.Crypter // usages only in UploadOplogArchive
 	buf     *bytes.Buffer
 
-	kubeClient        controllerruntime.Client
-	snapshotName      string
-	snapshotNamespace string
-	dbNode            string
+	kubeClient controllerruntime.Client
+	dbNode     string
+
+	snapshotName                      string
+	snapshotNamespace                 string
+	snapshotSuccessfulLogHistoryLimit int
+	snapshotFailedLogHistoryLimit     int
 }
 
 // NewStorageUploader builds mongodb uploader.
@@ -238,9 +242,16 @@ func (su *StorageUploader) SetKubeClient(client controllerruntime.Client) {
 	su.kubeClient = client
 }
 
-func (su *StorageUploader) SetSnapshot(name, namespace string) {
+func (su *StorageUploader) SetupSnapshot(name, namespace, successfulLog, failedLog string) error {
+	var err error
 	su.snapshotName = name
 	su.snapshotNamespace = namespace
+	su.snapshotSuccessfulLogHistoryLimit, err = strconv.Atoi(successfulLog)
+	if err != nil {
+		return err
+	}
+	su.snapshotFailedLogHistoryLimit, err = strconv.Atoi(failedLog)
+	return err
 }
 
 func (su *StorageUploader) SetDBNode(node string) {
@@ -250,8 +261,9 @@ func (su *StorageUploader) GetDBNode() string {
 	return su.dbNode
 }
 
-func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error {
+func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp, uploadErr error, arch models.Archive) error {
 	var snapshot storageapi.Snapshot
+	archFileName := arch.DBNodeSpecificFileName(su.dbNode)
 	err := su.kubeClient.Get(context.TODO(), controllerruntime.ObjectKey{
 		Namespace: su.snapshotNamespace,
 		Name:      su.snapshotName,
@@ -272,14 +284,29 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
 				in.Status.Components = make(map[string]storageapi.Component)
 			}
 			if _, ok := in.Status.Components[compName]; !ok {
-				walSegments := make([]storageapi.WalSegment, 1)
-				walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}
+
+				logStats := new(storageapi.LogStats)
+
+				if uploadErr != nil {
+					su.updateLogStatsLog(logStats, fmt.Errorf("failed to push archiver %s. error: %w", archFileName, uploadErr).Error(), arch)
+				} else {
+					su.updateLogStatsLog(logStats, "", arch)
+					startTime := (&metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}).String()
+					logStats.Start = &startTime
+				}
 				in.Status.Components[compName] = storageapi.Component{
-					WalSegments: walSegments,
+					LogStats: logStats,
 				}
 			}
 			component := in.Status.Components[compName]
-			component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}
+
+			if uploadErr != nil {
+				su.updateLogStatsLog(component.LogStats, fmt.Errorf("failed to push archiver %s. error: %w", archFileName, uploadErr).Error(), arch)
+			} else {
+				su.updateLogStatsLog(component.LogStats, "", arch)
+				endTime := (&metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}).String()
+				component.LogStats.End = &endTime
+			}
 			in.Status.Components[compName] = component
 
 			return in
@@ -288,13 +315,35 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
 	return err
 }
 
-// UploadOplogArchive compresses a stream and uploads it with given archive name.
-func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Reader, firstTS, lastTS models.Timestamp) error {
-	err := su.updateSnapshot(firstTS, lastTS)
-	if err != nil {
-		return fmt.Errorf("failed to update snapshot: %w", err)
+func (su *StorageUploader) updateLogStatsLog(logStats *storageapi.LogStats, errMsg string, arch models.Archive) {
+	// No error found while uploading arch
+	if errMsg == "" {
+		logStats.TotalSucceededCount++
+		logStats.LastSucceededStats = append(logStats.LastSucceededStats, getLog(errMsg, arch))
+		if len(logStats.LastSucceededStats) > su.snapshotSuccessfulLogHistoryLimit {
+			logStats.LastSucceededStats = logStats.LastSucceededStats[1:]
+		}
+	} else {
+		logStats.TotalFailedCount++
+		logStats.LastFailedStats = append(logStats.LastFailedStats, getLog(errMsg, arch))
+		if len(logStats.LastFailedStats) > su.snapshotFailedLogHistoryLimit {
+			logStats.LastFailedStats = logStats.LastFailedStats[1:]
+		}
 	}
+}
 
+func getLog(msg string, arch models.Archive) storageapi.Log {
+	startTime := (&metav1.Time{Time: time.Unix(int64(arch.Start.ToBsonTS().T), 0)}).String()
+	endTime := (&metav1.Time{Time: time.Unix(int64(arch.End.ToBsonTS().T), 0)}).String()
+	return storageapi.Log{
+		Start: &startTime,
+		End:   &endTime,
+		Error: msg,
+	}
+}
+
+// UploadOplogArchive compresses a stream and uploads it with given archive name.
+func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Reader, firstTS, lastTS models.Timestamp) error {
 	arch, err := models.NewArchive(firstTS, lastTS, su.Compression().FileExtension(), models.ArchiveTypeOplog)
 	if err != nil {
 		return fmt.Errorf("can not build archive: %w", err)
@@ -308,7 +357,12 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea
 	}
 	fileName := arch.DBNodeSpecificFileName(su.dbNode)
 	// providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage
-	return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
+	uploadErr := su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
+	err = su.updateSnapshot(firstTS, lastTS, uploadErr, arch)
+	if err != nil {
+		return fmt.Errorf("failed to update snapshot: %w\nerror from uploading archiver: %w", err, uploadErr)
+	}
+	return uploadErr
 }
 
 // UploadGap uploads mark indicating archiving gap.