From 592a3bfb3932f6cbb43123208d1d47b2c999489a Mon Sep 17 00:00:00 2001
From: jayjiahua <553544693@qq.com>
Date: Wed, 14 Jun 2023 14:19:41 +0800
Subject: [PATCH 1/2] feat: change redis password file format to json

---
 filebeat/input/redis/input.go | 75 +++++++++++++++++++++++++----------
 1 file changed, 54 insertions(+), 21 deletions(-)

diff --git a/filebeat/input/redis/input.go b/filebeat/input/redis/input.go
index 5b49fa9b7069..88745dd512c6 100644
--- a/filebeat/input/redis/input.go
+++ b/filebeat/input/redis/input.go
@@ -18,7 +18,9 @@
 package redis
 
 import (
+	"encoding/json"
 	"os"
+	"strings"
 	"time"
 
 	rd "github.com/garyburd/redigo/redis"
@@ -41,11 +43,37 @@ func init() {
 
 // Input is a input for redis
 type Input struct {
-	started  bool
-	outlet   channel.Outleter
-	config   config
-	cfg      *common.Config
-	registry *harvester.Registry
+	started     bool
+	outlet      channel.Outleter
+	config      config
+	cfg         *common.Config
+	registry    *harvester.Registry
+	passwordMap map[string]string
+}
+
+// loadPwdFile reads the redis password file and returns the password map
+// sample file: https://github.com/oliver006/redis_exporter/blob/master/contrib/sample-pwd-file.json
+func loadPwdFile(passwordFile string) (map[string]string, error) {
+	res := make(map[string]string)
+
+	logp.Debug("start load password file: %s", passwordFile)
+	bytes, err := os.ReadFile(passwordFile)
+	if err != nil {
+		logp.Warn("load password file failed: %s", err)
+		return nil, err
+	}
+	err = json.Unmarshal(bytes, &res)
+	if err != nil {
+		logp.Warn("password file format error: %s", err)
+		return nil, err
+	}
+
+	logp.Info("Loaded %d entries from %s", len(res), passwordFile)
+	for k := range res {
+		logp.Debug("%s", k)
+	}
+
+	return res, nil
 }
 
 // NewInput creates a new redis input
@@ -60,32 +88,26 @@ func NewInput(cfg *common.Config, outletFactory channel.Connector, context input
 	}
 
 	// 读取文件内容
-	var content = ""
+	passwordMap := make(map[string]string)
 	if config.PasswordFile != "" {
-		info, err := os.ReadFile(config.PasswordFile)
+		passwordMap, err = loadPwdFile(config.PasswordFile)
 		if err != nil {
-			logp.Err("Read Password File Error: %s", err)
-		} else {
-			content = string(info)
+			logp.Err("Error loading redis passwords from file %s, err: %s", config.PasswordFile, err)
 		}
 	}
 
-	// 将文件内容赋值给 config.Password
-	if content != "" {
-		config.Password = content
-	}
-
 	outlet, err := outletFactory(cfg, context.DynamicFields)
 	if err != nil {
 		return nil, err
 	}
 
 	p := &Input{
-		started:  false,
-		outlet:   outlet,
-		config:   config,
-		cfg:      cfg,
-		registry: harvester.NewRegistry(),
+		started:     false,
+		outlet:      outlet,
+		config:      config,
+		cfg:         cfg,
+		registry:    harvester.NewRegistry(),
+		passwordMap: passwordMap,
 	}
 
 	return p, nil
@@ -107,7 +129,18 @@ func (p *Input) Run() {
 
 	forwarder := harvester.NewForwarder(p.outlet)
 	for _, host := range p.config.Hosts {
-		pool := CreatePool(host, p.config.Password, p.config.Network,
+		uri := host
+		if !strings.Contains(uri, "://") {
+			uri = "redis://" + uri
+		}
+
+		// 判断 password file 中是否存在该域名的密码配置,如果不存在,则使用默认密码
+		password, ok := p.passwordMap[uri]
+		if !ok {
+			password = p.config.Password
+		}
+
+		pool := CreatePool(host, password, p.config.Network,
 			p.config.MaxConn, p.config.IdleTimeout, p.config.IdleTimeout)
 
 		h, err := NewHarvester(pool.Get())

From b345ef2ee5fb9f6176570d83ded8d8f46ca004df Mon Sep 17 00:00:00 2001
From: jayjiahua <553544693@qq.com>
Date: Thu, 15 Jun 2023 20:29:04 +0800
Subject: [PATCH 2/2] =?UTF-8?q?fix:=20redis=20=E6=85=A2=E6=97=A5=E5=BF=97?=
 =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E6=96=87=E4=BB=B6=E6=9B=B4=E6=96=B0=E6=8A=A5?=
 =?UTF-8?q?=E9=94=99=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 filebeat/input/redis/harvester.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/filebeat/input/redis/harvester.go b/filebeat/input/redis/harvester.go
index 27c53157ed27..043d04c2c1d1 100644
--- a/filebeat/input/redis/harvester.go
+++ b/filebeat/input/redis/harvester.go
@@ -30,6 +30,7 @@ import (
 	"github.com/elastic/beats/libbeat/logp"
 
 	"github.com/elastic/beats/filebeat/harvester"
+	"github.com/elastic/beats/filebeat/input/file"
 	"github.com/elastic/beats/filebeat/util"
 )
 
@@ -162,6 +163,12 @@ func (h *Harvester) Run() error {
 			},
 		}
 
+		// 设置为完成状态,避免进度文件回写报错
+		data.SetState(file.State{
+			Type: "redis",
+			Finished: true,
+		})
+
 		h.forwarder.Send(data)
 	}
 	return nil
@@ -176,3 +183,4 @@ func (h *Harvester) Stop() {
 func (h *Harvester) ID() uuid.UUID {
 	return h.id
 }
+