Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sd: add cleanup for dead hosts
Browse files Browse the repository at this point in the history
msaf1980 committed Apr 25, 2023
1 parent bf5e913 commit a21d442
Showing 4 changed files with 93 additions and 22 deletions.
14 changes: 12 additions & 2 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
@@ -96,6 +96,7 @@ func main() {
pprof := flag.String("pprof", "", "Additional pprof listen addr for non-server modes (tagger, etc..), overrides pprof-listen from common ")

sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup registered nodes in SD")

printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
@@ -124,7 +125,7 @@ func main() {
return
}

if *sdList {
if *sdList || *sdClean {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var sd sd.SD
logger := zapwriter.Default()
@@ -134,9 +135,18 @@ func main() {
default:
panic("serive discovery type not registered")
}
ts := time.Now().Unix() - 3600
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
fmt.Printf("%s: %s\n", node.Key, node.Value)
if *sdClean && node.Flags > 0 {
if ts > node.Flags {
fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
}
} else {
log.Fatal(err)
35 changes: 29 additions & 6 deletions sd/nginx/nginx.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"errors"
"strconv"
"strings"
"time"

"github.com/lomik/graphite-clickhouse/sd/utils"

@@ -17,6 +18,8 @@ var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
ErrNoKey = errors.New("list key no found")
ErrInvalidKey = errors.New("list key is invalid")

timeNow = time.Now
)

func splitNode(node string) (dc, host, listen string, ok bool) {
@@ -51,7 +54,7 @@ func New(url, namespace, hostname string, logger *zap.Logger) *Nginx {
sd := &Nginx{
logger: logger,
body: make([]byte, 128),
backupBody: []byte(`{"backup":1, "max_fails":0}`),
backupBody: []byte(`{"backup":1,"max_fails":0}`),
nsEnd: "upstreams/" + namespace + "/",
hostname: hostname,
}
@@ -188,19 +191,27 @@ func (sd *Nginx) Nodes() (nodes []utils.KV, err error) {
if s, ok := i.(string); ok {
if strings.HasPrefix(s, sd.nsEnd) {
s = s[len(sd.nsEnd):]
kv := utils.KV{Key: s}
if i, ok := jNode["Value"]; ok {
if v, ok := i.(string); ok {
d, err := base64.StdEncoding.DecodeString(v)
if err != nil {
return nil, err
}
nodes = append(nodes, utils.KV{Key: s, Value: stringutils.UnsafeString(d)})
} else {
nodes = append(nodes, utils.KV{Key: s, Value: ""})
kv.Value = stringutils.UnsafeString(d)
}
}
if i, ok := jNode["Flags"]; ok {
switch v := i.(type) {
case float64:
kv.Flags = int64(v)
case int:
kv.Flags = int64(v)
case int64:
kv.Flags = v
}
} else {
nodes = append(nodes, utils.KV{Key: s, Value: ""})
}
nodes = append(nodes, kv)
} else {
return nil, ErrInvalidKey
}
@@ -227,11 +238,20 @@ func (sd *Nginx) update(ip, port string, dc []string) (err error) {
}
sd.url.WriteString(port)

// add custom query flags
sd.url.WriteByte('?')
sd.url.WriteString("flags=")
sd.url.WriteInt(timeNow().Unix(), 10)

if err = utils.HttpPut(sd.url.String(), sd.body); err != nil {
sd.logger.Error("put", zap.String("address", sd.url.String()[sd.pos:]), zap.Error(err))
return
}
} else {
flags := make([]byte, 0, 32)
flags = append(flags, "?flags="...)
flags = strconv.AppendInt(flags, timeNow().Unix(), 10)

for i := 0; i < len(dc); i++ {
// cfg.Common.SDDc
sd.url.Truncate(sd.pos)
@@ -245,6 +265,9 @@ func (sd *Nginx) update(ip, port string, dc []string) (err error) {
}
sd.url.WriteString(port)

// add custom query flags
sd.url.Write(flags)

if i == 0 {
if nErr := utils.HttpPut(sd.url.String(), sd.body); nErr != nil {
sd.logger.Error(
65 changes: 51 additions & 14 deletions sd/nginx/nginx_test.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ package nginx
import (
"sort"
"testing"
"time"

"github.com/lomik/graphite-clickhouse/sd/utils"
"github.com/lomik/zapwriter"
@@ -27,6 +28,10 @@ var (
)

func TestNginx(t *testing.T) {
timeNow = func() time.Time {
return time.Unix(1682408721, 0)
}

logger := zapwriter.Default()

sd1 := New("http://127.0.0.1:8500/v1/kv/upstreams", "graphite", hostname1, logger)
@@ -126,6 +131,17 @@ func TestNginx(t *testing.T) {
"_/test_host2/192.168.0.1:9090": `{"weight":25,"max_fails":0}`,
}, nodesMap,
)

nodesV, err := sd2.Nodes()
require.NoError(t, err)
assert.Equal(
t, []utils.KV{
{Key: "_/test_host1/192.168.0.1:9090", Value: `{"weight":10,"max_fails":0}`, Flags: 1682408721},
{Key: "_/test_host2/192.168.0.1:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
{Key: "_/test_host2/192.168.1.25:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
}, nodesV,
)

require.NoError(t, sd2.Clear(ip2, port))
nodesMap, err = sd2.ListMap()
require.NoError(t, err)
@@ -148,6 +164,10 @@ func TestNginx(t *testing.T) {
}

func TestNginxDC(t *testing.T) {
timeNow = func() time.Time {
return time.Unix(1682408721, 0)
}

logger := zapwriter.Default()

sd1 := New("http://127.0.0.1:8500/v1/kv/upstreams", "graphite", hostname1, logger)
@@ -182,8 +202,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc1/test_host1/192.168.0.1:9090": `{"weight":10,"max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

@@ -205,8 +225,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":21,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

@@ -228,8 +248,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

@@ -245,8 +265,8 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc1/test_host1/192.168.0.1:9090": `{"weight":10,"max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc2/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host1/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

@@ -258,21 +278,38 @@ func TestNginxDC(t *testing.T) {
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc2/test_host2/192.168.0.1:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.0.1:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.0.1:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

nodesV, err := sd2.Nodes()
require.NoError(t, err)
assert.Equal(
t, []utils.KV{
{Key: "dc1/test_host1/192.168.0.1:9090", Value: `{"weight":10,"max_fails":0}`, Flags: 1682408721},
{Key: "dc1/test_host2/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc1/test_host2/192.168.1.25:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc2/test_host1/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc2/test_host2/192.168.0.1:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
{Key: "dc2/test_host2/192.168.1.25:9090", Value: `{"weight":25,"max_fails":0}`, Flags: 1682408721},
{Key: "dc3/test_host1/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc3/test_host2/192.168.0.1:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
{Key: "dc3/test_host2/192.168.1.25:9090", Value: `{"backup":1,"max_fails":0}`, Flags: 1682408721},
}, nodesV,
)

require.NoError(t, sd2.Clear(ip2, port))
nodesMap, err = sd2.ListMap()
require.NoError(t, err)
assert.Equal(
t, map[string]string{
"dc2/test_host2/192.168.1.25:9090": `{"weight":25,"max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1, "max_fails":0}`,
"dc1/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
"dc3/test_host2/192.168.1.25:9090": `{"backup":1,"max_fails":0}`,
}, nodesMap,
)

1 change: 1 addition & 0 deletions sd/utils/utils.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ var (
type KV struct {
Key string
Value string
Flags int64
}

func HttpGet(url string) ([]byte, error) {

0 comments on commit a21d442

Please sign in to comment.