From 25f774a1649ee2b569989ef15be310f7a3ec323e Mon Sep 17 00:00:00 2001
From: msaf1980 <msaf1980@gmail.com>
Date: Tue, 25 Apr 2023 13:32:47 +0500
Subject: [PATCH] sd: add cleanup for dead hosts

---
 graphite-clickhouse.go | 16 +++++++++--
 sd/nginx/nginx.go      | 35 +++++++++++++++++++----
 sd/nginx/nginx_test.go | 65 +++++++++++++++++++++++++++++++++---------
 sd/utils/utils.go      |  1 +
 4 files changed, 94 insertions(+), 23 deletions(-)

diff --git a/graphite-clickhouse.go b/graphite-clickhouse.go
index d40854854..dcbac146c 100644
--- a/graphite-clickhouse.go
+++ b/graphite-clickhouse.go
@@ -96,6 +96,7 @@ func main() {
 	pprof := flag.String("pprof", "", "Additional pprof listen addr for non-server modes (tagger, etc..), overrides pprof-listen from common ")
 
 	sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
+	sdClean := flag.Bool("sd-clean", false, "Cleanup registered nodes in SD")
 
 	printVersion := flag.Bool("version", false, "Print version")
 	verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
@@ -124,7 +125,7 @@ func main() {
 		return
 	}
 
-	if *sdList {
+	if *sdList || *sdClean {
 		if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
 			var sd sd.SD
 			logger := zapwriter.Default()
@@ -132,11 +133,20 @@ func main() {
 			case config.SDNginx:
 				sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, "", logger)
 			default:
-				panic("serive discovery type not registered")
+				panic("service discovery type not registered")
 			}
+			ts := time.Now().Unix() - 3600
 			if nodes, err := sd.Nodes(); err == nil {
 				for _, node := range nodes {
-					fmt.Printf("%s: %s\n", node.Key, node.Value)
+					if *sdClean && node.Flags > 0 {
+						if ts > node.Flags {
+							fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
+						} else {
+							fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
+						}
+					} else {
+						fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
+					}
 				}
 			} else {
 				log.Fatal(err)
diff --git a/sd/nginx/nginx.go b/sd/nginx/nginx.go
index 39ae4810d..aa841fe8a 100644
--- a/sd/nginx/nginx.go
+++ b/sd/nginx/nginx.go
@@ -5,6 +5,7 @@ import (
 	"errors"
 	"strconv"
 	"strings"
+	"time"
 
 	"github.com/lomik/graphite-clickhouse/sd/utils"
 
@@ -17,6 +18,8 @@ var (
 	json          = jsoniter.ConfigCompatibleWithStandardLibrary
 	ErrNoKey      = errors.New("list key no found")
 	ErrInvalidKey = errors.New("list key is invalid")
+
+	timeNow = time.Now
 )
 
 func splitNode(node string) (dc, host, listen string, ok bool) {
@@ -51,7 +54,7 @@ func New(url, namespace, hostname string, logger *zap.Logger) *Nginx {
 	sd := &Nginx{
 		logger:     logger,
 		body:       make([]byte, 128),
-		backupBody: []byte(`{"backup":1, "max_fails":0}`),
+		backupBody: []byte(`{"backup":1,"max_fails":0}`),
 		nsEnd:      "upstreams/" + namespace + "/",
 		hostname:   hostname,
 	}
@@ -188,19 +191,27 @@ func (sd *Nginx) Nodes() (nodes []utils.KV, err error) {
 				if s, ok := i.(string); ok {
 					if strings.HasPrefix(s, sd.nsEnd) {
 						s = s[len(sd.nsEnd):]
+						kv := utils.KV{Key: s}
 						if i, ok := jNode["Value"]; ok {
 							if v, ok := i.(string); ok {
 								d, err := base64.StdEncoding.DecodeString(v)
 								if err != nil {
 									return nil, err
 								}
-								nodes = append(nodes, utils.KV{Key: s, Value: stringutils.UnsafeString(d)})
-							} else {
-								nodes = append(nodes, utils.KV{Key: s, Value: ""})
+								kv.Value = stringutils.UnsafeString(d)
+							}
+						}
+						if i, ok := jNode["Flags"]; ok {
+							switch v := i.(type) {
+							case float64:
+								kv.Flags = int64(v)
+							case int:
+								kv.Flags = int64(v)
+							case int64:
+								kv.Flags = v
 							}
-						} else {
-							nodes = append(nodes, utils.KV{Key: s, Value: ""})
 						}
+						nodes = append(nodes, kv)
 					} else {
 						return nil, ErrInvalidKey
 					}
@@ -227,11 +238,20 @@ func (sd *Nginx) update(ip, port string, dc []string) (err error) {
 		}
 		sd.url.WriteString(port)
 
+		// add custom query flags
+		sd.url.WriteByte('?')
+		sd.url.WriteString("flags=")
+		sd.url.WriteInt(timeNow().Unix(), 10)
+
 		if err = utils.HttpPut(sd.url.String(), sd.body); err != nil {
 			sd.logger.Error("put", zap.String("address", sd.url.String()[sd.pos:]), zap.Error(err))
 			return
 		}
 	} else {
+		flags := make([]byte, 0, 32)
+		flags = append(flags, "?flags="...)
+		flags = strconv.AppendInt(flags, timeNow().Unix(), 10)
+
 		for i := 0; i < len(dc); i++ {
 			// cfg.Common.SDDc
 			sd.url.Truncate(sd.pos)
@@ -245,6 +265,9 @@ func (sd *Nginx) update(ip, port string, dc []string) (err error) {
 			}
 			sd.url.WriteString(port)
 
+			// add custom query flags
+			sd.url.Write(flags)
+
 			if i == 0 {
 				if nErr := utils.HttpPut(sd.url.String(), sd.body); nErr != nil {
 					sd.logger.Error(
diff --git a/sd/nginx/nginx_test.go b/sd/nginx/nginx_test.go
index d63e51f2b..076c4736e 100644
--- a/sd/nginx/nginx_test.go
+++ b/sd/nginx/nginx_test.go
@@ -6,6 +6,7 @@ package nginx
 import (
 	"sort"
 	"testing"
+	"time"
 
 	"github.com/lomik/graphite-clickhouse/sd/utils"
 	"github.com/lomik/zapwriter"
@@ -27,6 +28,10 @@ var (
 )
 
 func TestNginx(t *testing.T) {
+	timeNow = func() time.Time {
+		return time.Unix(1682408721, 0)
+	}
+
 	logger := zapwriter.Default()
 
 	sd1 := New("http://127.0.0.1:8500/v1/kv/upstreams", "graphite", hostname1, logger)
@@ -126,6 +131,17 @@ func TestNginx(t *testing.T) {
 			"_/test_host2/192.168.0.1:9090":  `{"weight":25,"max_fails":0}`,
 		}, nodesMap,
 	)
+
+	nodesV, err := sd2.Nodes()
+	require.NoError(t, err)
+	assert.Equal(
+		t, []utils.KV{
+			{Key: "_/test_host1/192.168.0.1:9090", Value: `{"weight":10,"max_fails":0}`, Flags: 1682408721},
+			{Key: "_/test_host2/192.168.0.1:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
+			{Key: "_/test_host2/192.168.1.25:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
+		}, nodesV,
+	)
+
 	require.NoError(t, sd2.Clear(ip2, port))
 	nodesMap, err = sd2.ListMap()
 	require.NoError(t, err)
@@ -148,6 +164,10 @@ func TestNginx(t *testing.T) {
 }
 
 func TestNginxDC(t *testing.T) {
+	timeNow = func() time.Time {
+		return time.Unix(1682408721, 0)
+	}
+
 	logger := zapwriter.Default()
 
 	sd1 := New("http://127.0.0.1:8500/v1/kv/upstreams", "graphite", hostname1, logger)
@@ -182,8 +202,8 @@ func TestNginxDC(t *testing.T) {
 	assert.Equal(
 		t, map[string]string{
 			"dc1/test_host1/192.168.0.1:9090": `{"weight":10,"max_fails":0}`,
-			"dc2/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
-			"dc3/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
+			"dc2/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
+			"dc3/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
 		}, nodesMap,
 	)
 
@@ -205,8 +225,8 @@ func TestNginxDC(t *testing.T) {
 	assert.Equal(
 		t, map[string]string{
 			"dc2/test_host2/192.168.1.25:9090": `{"weight":21,"max_fails":0}`,
-			"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
-			"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
+			"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
+			"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
 		}, nodesMap,
 	)
 
@@ -228,8 +248,8 @@ func TestNginxDC(t *testing.T) {
 	assert.Equal(
 		t, map[string]string{
 			"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
-			"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
-			"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
+			"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
+			"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
 		}, nodesMap,
 	)
 
@@ -245,8 +265,8 @@ func TestNginxDC(t *testing.T) {
 	assert.Equal(
 		t, map[string]string{
 			"dc1/test_host1/192.168.0.1:9090": `{"weight":10,"max_fails":0}`,
-			"dc2/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
-			"dc3/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
+			"dc2/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
+			"dc3/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
 		}, nodesMap,
 	)
 
@@ -258,21 +278,38 @@ func TestNginxDC(t *testing.T) {
 	assert.Equal(
 		t, map[string]string{
 			"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
-			"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
-			"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
+			"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
+			"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
 			"dc2/test_host2/192.168.0.1:9090":  `{"weight":25,"max_fails":0}`,
-			"dc1/test_host2/192.168.0.1:9090":  `{"backup":1, "max_fails":0}`,
-			"dc3/test_host2/192.168.0.1:9090":  `{"backup":1, "max_fails":0}`,
+			"dc1/test_host2/192.168.0.1:9090":  `{"backup":1,"max_fails":0}`,
+			"dc3/test_host2/192.168.0.1:9090":  `{"backup":1,"max_fails":0}`,
 		}, nodesMap,
 	)
+
+	nodesV, err := sd2.Nodes()
+	require.NoError(t, err)
+	assert.Equal(
+		t, []utils.KV{
+			{Key: "dc1/test_host1/192.168.0.1:9090", Value: `{"weight":10,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc1/test_host2/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc1/test_host2/192.168.1.25:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc2/test_host1/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc2/test_host2/192.168.0.1:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc2/test_host2/192.168.1.25:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc3/test_host1/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc3/test_host2/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
+			{Key: "dc3/test_host2/192.168.1.25:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
+		}, nodesV,
+	)
+
 	require.NoError(t, sd2.Clear(ip2, port))
 	nodesMap, err = sd2.ListMap()
 	require.NoError(t, err)
 	assert.Equal(
 		t, map[string]string{
 			"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
-			"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
-			"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
+			"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
+			"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
 		}, nodesMap,
 	)
 
diff --git a/sd/utils/utils.go b/sd/utils/utils.go
index c68776cd3..defe3d907 100644
--- a/sd/utils/utils.go
+++ b/sd/utils/utils.go
@@ -18,6 +18,7 @@ var (
 type KV struct {
 	Key   string
 	Value string
+	Flags int64
 }
 
 func HttpGet(url string) ([]byte, error) {