Skip to content

Commit

Permalink
sd: cleanup on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Aug 8, 2023
1 parent f127d52 commit 28f529c
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 21 deletions.
56 changes: 50 additions & 6 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -9,8 +10,12 @@ import (
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"runtime/debug"
"sync"
"syscall"
"time"

"github.com/lomik/zapwriter"
Expand Down Expand Up @@ -80,7 +85,10 @@ func (app *App) Handler(handler http.Handler) http.Handler {
})
}

var BuildVersion = "(development build)"
var (
BuildVersion = "(development build)"
srv *http.Server
)

func main() {
rand.Seed(time.Now().UnixNano())
Expand Down Expand Up @@ -137,14 +145,15 @@ func main() {
case config.SDNginx:
sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, "", logger)
default:
panic("service discovery type not registered")
panic(fmt.Errorf("service discovery type %q can be registered", cfg.Common.SDType.String()))
}
ts := time.Now().Unix() - 3600
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
if *sdClean && node.Flags > 0 {
if ts > node.Flags {
fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
// sd.Delete(node.Key, node.Value)
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
Expand All @@ -155,6 +164,8 @@ func main() {
} else {
log.Fatal(err)
}
} else {
fmt.Fprintln(os.Stderr, "SD not enabled")
}
return
}
Expand Down Expand Up @@ -261,10 +272,43 @@ func main() {
metrics.Graphite.Start(nil)
}

if cfg.NeedLoadAvgColect() {
sdLogger := localManager.Logger("service discovery")
go sd.Register(cfg, sdLogger)
var exitWait sync.WaitGroup
srv = &http.Server{Addr: cfg.Common.Listen}

exitWait.Add(1)

go func() {
defer exitWait.Done()
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// unexpected error. port in use?
log.Fatalf("ListenAndServe(): %v", err)
}
}()

go func() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
<-stop
logger.Info("stoping graphite-clickhouse")
if cfg.Common.SDType != config.SDNone {
// unregister SD
sd.Stop()
time.Sleep(10 * time.Second)
}
// initiating the shutdown
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
srv.Shutdown(ctx)
}()

if cfg.Common.SD != "" {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(cfg, sdLogger)
}()
}

log.Fatal(http.ListenAndServe(cfg.Common.Listen, mux))
exitWait.Wait()

logger.Info("stop graphite-clickhouse")
}
25 changes: 17 additions & 8 deletions sd/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ import (
"go.uber.org/zap"
)

var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
ErrNoKey = errors.New("list key no found")
ErrInvalidKey = errors.New("list key is invalid")
type ErrInvalidKey struct {
val string
}

func (e ErrInvalidKey) Error() string {
if e.val == "" {
return "list key is invalid"
}
return "list key is invalid: '" + e.val + "'"
}

timeNow = time.Now
var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
ErrNoKey = errors.New("list key no found")
timeNow = time.Now
)

func splitNode(node string) (dc, host, listen string, ok bool) {
Expand Down Expand Up @@ -106,7 +115,7 @@ func (sd *Nginx) List() (nodes []string, err error) {
nodes = append(nodes, s)
}
} else {
return nil, ErrInvalidKey
return nil, ErrInvalidKey{s}
}
} else {
return nil, ErrNoKey
Expand Down Expand Up @@ -157,7 +166,7 @@ func (sd *Nginx) ListMap() (nodes map[string]string, err error) {
}
}
} else {
return nil, ErrInvalidKey
return nil, ErrInvalidKey{s}
}
} else {
return nil, ErrNoKey
Expand Down Expand Up @@ -213,7 +222,7 @@ func (sd *Nginx) Nodes() (nodes []utils.KV, err error) {
}
nodes = append(nodes, kv)
} else {
return nil, ErrInvalidKey
return nil, ErrInvalidKey{s}
}
} else {
return nil, ErrNoKey
Expand Down
34 changes: 27 additions & 7 deletions sd/register.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sd

import (
"context"
"os"
"strings"
"time"
Expand All @@ -14,8 +13,10 @@ import (
)

var (
ctxMain, Stop = context.WithCancel(context.Background())
delay = time.Second * 10
// ctxMain, Stop = context.WithCancel(context.Background())
stop chan struct{} = make(chan struct{}, 1)
delay = time.Second * 10
hostname string
)

type SD interface {
Expand All @@ -34,7 +35,6 @@ func Register(cfg *config.Config, logger *zap.Logger) {
listenIP string
prevIP string
registerFirst bool
hostname string
sd SD
err error
load float64
Expand All @@ -59,11 +59,14 @@ func Register(cfg *config.Config, logger *zap.Logger) {
if err == nil {
load_avg.Store(load)
}

logger.Info("init sd",
zap.String("hostname", hostname),
)

w = load_avg.Weight(cfg.Common.BaseWeight, load)
sd.Update(listenIP, cfg.Common.Listen, cfg.Common.SDDc, w)
sd.Clear(listenIP, cfg.Common.Listen)

defer sd.Clear("", "")
}
LOOP:
for {
Expand All @@ -90,8 +93,25 @@ LOOP:
select {
case <-t:
continue
case <-ctxMain.Done():
case <-stop:
break LOOP
}
}

if sd != nil {
if err := sd.Clear("", ""); err == nil {
logger.Info("cleanup sd",
zap.String("hostname", hostname),
)
} else {
logger.Warn("cleanup sd",
zap.String("hostname", hostname),
zap.Error(err),
)
}
}
}

func Stop() {
stop <- struct{}{}
}

0 comments on commit 28f529c

Please sign in to comment.