Skip to content

Commit

Permalink
Add kibana monitoring (#2)
Browse files Browse the repository at this point in the history
* Migrate to go mob instead of go dep

* Add kibana avaibility monitoring + fix cleanup of some metrics

* Remove version as it is never exported

* Add comment about allEverKnownodes

* Delegate consul service name selection to program argument
  • Loading branch information
erebe authored Mar 26, 2020
1 parent 244fe13 commit d0bb4f0
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 32 deletions.
18 changes: 4 additions & 14 deletions cmd/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type esnode struct {
ip string
port int
cluster string
version string
}

func contains(a []string, x string) bool {
Expand All @@ -32,6 +31,8 @@ func contains(a []string, x string) bool {

func updateEverKnownNodes(allEverKnownNodes []string, nodes []esnode) []string {
for _, node := range nodes {
// TODO: Replace by a real struct instead of a string concatenation...
// also allEverKnownNodes leak memory as it never delete old/cleaned items
serializedNode := fmt.Sprintf("%v|%v", node.name, node.cluster)
if contains(allEverKnownNodes, serializedNode) == false {
allEverKnownNodes = append(allEverKnownNodes, serializedNode)
Expand All @@ -51,17 +52,7 @@ func clusterNameFromTags(serviceTags []string) string {
return ""
}

func versionFromTags(serviceTags []string) string {
for _, tag := range serviceTags {
splitted := strings.SplitN(tag, "-", 2)
if splitted[0] == "version" {
return splitted[1]
}
}
return ""
}

func discoverEsNodes() ([]esnode, error) {
func discoverNodesForService(serviceName string) ([]esnode, error) {
start := time.Now()

consulConfig := api.DefaultConfig()
Expand All @@ -74,7 +65,7 @@ func discoverEsNodes() ([]esnode, error) {
}

catalogServices, _, err := consul.Catalog().Service(
"elasticsearch-all", "",
serviceName, "",
&api.QueryOptions{AllowStale: true, RequireConsistent: false, UseCache: true},
)
if err != nil {
Expand All @@ -92,7 +83,6 @@ func discoverEsNodes() ([]esnode, error) {
ip: svc.Address,
port: svc.ServicePort,
cluster: clusterNameFromTags(svc.ServiceTags),
version: versionFromTags(svc.ServiceTags),
})
}

Expand Down
56 changes: 52 additions & 4 deletions cmd/probing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/valyala/fastjson"
)

func probeNode(node *esnode, updateProbingPeriod time.Duration) error {
func probeElasticsearchNode(node *esnode, updateProbingPeriod time.Duration) error {
client := &http.Client{
Timeout: updateProbingPeriod - 2*time.Second,
}
Expand All @@ -26,7 +26,7 @@ func probeNode(node *esnode, updateProbingPeriod time.Duration) error {
if err != nil {
log.Debug("Probing failed for ", node.name, ": ", probingURL, " ", err.Error())
log.Error(err)
nodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
elasticNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
errorsCount.Inc()
return err
}
Expand All @@ -35,7 +35,7 @@ func probeNode(node *esnode, updateProbingPeriod time.Duration) error {
log.Debug("Probe result for ", node.name, ": ", resp.Status)
if resp.StatusCode != 200 {
log.Error("Probing failed for ", node.name, ": ", probingURL, " ", resp.Status)
nodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
elasticNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
errorsCount.Inc()
return fmt.Errorf("ES Probing failed")
}
Expand All @@ -50,8 +50,56 @@ func probeNode(node *esnode, updateProbingPeriod time.Duration) error {
}
}

nodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(1)
elasticNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(1)
nodeSearchLatencySummary.WithLabelValues(node.cluster, node.name).Observe(durationNanosec)

return nil
}

func probeKibanaNode(node *esnode, updateProbingPeriod time.Duration) error {
client := &http.Client{
Timeout: updateProbingPeriod - 2*time.Second,
}

probingURL := fmt.Sprintf("http://%v:%v/api/status", node.ip, node.port)
log.Debug("Start probing ", node.name)

resp, err := client.Get(probingURL)
if err != nil {
log.Debug("Probing failed for ", node.name, ": ", probingURL, " ", err.Error())
log.Error(err)
kibanaNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
errorsCount.Inc()
return err
}

log.Debug("Probe result for ", node.name, ": ", resp.Status)
if resp.StatusCode != 200 {
log.Error("Probing failed for ", node.name, ": ", probingURL, " ", resp.Status)
kibanaNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
errorsCount.Inc()
return fmt.Errorf("kibana Probing failed")
}

body, readErr := ioutil.ReadAll(resp.Body)
if readErr != nil {
kibanaNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
return fmt.Errorf("kibana Probing failed: %s", readErr)
}

var p fastjson.Parser
json, jsonErr := p.Parse(string(body))
if jsonErr != nil {
kibanaNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
return fmt.Errorf("kibana Probing failed: %s", jsonErr)
}
nodeState := string(json.GetStringBytes("status", "overall", "state"))
if nodeState != "green" {
kibanaNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(0)
return fmt.Errorf("kibana Probing failed: node not in a green/healthy state")
}

kibanaNodeAvailabilityGauge.WithLabelValues(node.cluster, node.name).Set(1)

return nil
}
15 changes: 13 additions & 2 deletions cmd/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,22 @@ var (
[]string{"cluster", "nodename"},
)

nodeAvailabilityGauge = promauto.NewGaugeVec(
elasticNodeAvailabilityGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "es_node_availability",
Help: "Reflects node availabity : 1 is OK, 0 means node unavailable ",
},
[]string{"cluster", "nodename"},
)

kibanaNodeAvailabilityGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kibana_node_availability",
Help: "Reflects node availabity : 1 is OK, 0 means node unavailable ",
},
[]string{"cluster", "nodename"},
)

nodeSearchLatencySummary = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "es_node_search_latency",
Expand Down Expand Up @@ -102,8 +110,11 @@ func cleanMetrics(nodes []esnode, allEverKnownNodes []string) error {
}
if deleteThisNodeMetrics {
log.Info("Metrics removed for vanished node ", n[0], " from cluster ", n[1])
nodeAvailabilityGauge.DeleteLabelValues(n[1], n[0])
elasticNodeAvailabilityGauge.DeleteLabelValues(n[1], n[0])
nodeSearchLatencySummary.DeleteLabelValues(n[1], n[0])
shardsSuccessfulGauge.DeleteLabelValues(n[1], n[0])
docsHitGauge.DeleteLabelValues(n[1], n[0])
kibanaNodeAvailabilityGauge.DeleteLabelValues(n[1], n[0])
}
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var probePeriod string
var cleanMetricsPeriod string
var metricsPort int
var loglevel string
var elasticsearchConsulService string
var kibanaConsulService string

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -56,6 +58,8 @@ func init() {
rootCmd.PersistentFlags().StringVar(&consulPeriod, "consulPeriod", "120s", "nodes discovery update interval")
rootCmd.PersistentFlags().StringVar(&cleanMetricsPeriod, "cleaningPeriod", "600s", "prometheus metrics cleaning interval (for vanished nodes)")
rootCmd.PersistentFlags().StringVar(&probePeriod, "probePeriod", "30s", "elasticsearch nodes probing interval")
rootCmd.PersistentFlags().StringVar(&elasticsearchConsulService, "elasticsearchConsulService", "elasticsearch-all", "elasticsearch consul service")
rootCmd.PersistentFlags().StringVar(&kibanaConsulService, "kibanaConsulService", "kibana-all", "kibana consul service")
rootCmd.PersistentFlags().IntVarP(&metricsPort, "metricsPort", "p", 2112, "port where prometheus will expose metrics to")
rootCmd.PersistentFlags().StringVarP(&loglevel, "loglevel", "l", "info", "log level")

Expand Down
56 changes: 44 additions & 12 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@ Expose all measures using a prometheus compliant HTTP endpoint.`,
startMetricsEndpoint()

log.Info("Discovering ES nodes for the first time")
allEverKnownNodes := []string{}
nodesList, err := discoverEsNodes()
var allEverKnownEsNodes []string
esNodesList, err := discoverNodesForService(elasticsearchConsulService)
if err != nil {
errorsCount.Inc()
log.Fatal("Impossible to discover ES datanodes during bootstrap, exiting")
}
allEverKnownNodes = updateEverKnownNodes(allEverKnownNodes, nodesList)
allEverKnownEsNodes = updateEverKnownNodes(allEverKnownEsNodes, esNodesList)

var allEverKnownKibanaNodes []string
kibanaNodesList, err := discoverNodesForService(kibanaConsulService)
if err != nil {
errorsCount.Inc()
log.Fatal("Impossible to discover kibana nodes during bootstrap, exiting")
}
allEverKnownEsNodes = updateEverKnownNodes(allEverKnownKibanaNodes, kibanaNodesList)

log.Info("Initializing tickers")
updateDiscoveryPeriod, err := time.ParseDuration(consulPeriod)
Expand Down Expand Up @@ -72,36 +80,60 @@ Expose all measures using a prometheus compliant HTTP endpoint.`,
select {
case <-cleanMetricsTicker.C:
log.Info("Cleaning Prometheus metrics for unreferenced nodes")
cleanMetrics(nodesList, allEverKnownNodes)
cleanMetrics(esNodesList, allEverKnownEsNodes)
cleanMetrics(kibanaNodesList, allEverKnownKibanaNodes)

case <-updateDiscoveryTicker.C:
// Elasticsearch
log.Debug("Starting updating ES nodes list")

updatedList, err := discoverEsNodes()
updatedList, err := discoverNodesForService(elasticsearchConsulService)
if err != nil {
log.Error("Unable to update ES nodes, using last known state")
errorsCount.Inc()
continue
}

log.Info("Updating nodes list")
allEverKnownNodes = updateEverKnownNodes(allEverKnownNodes, updatedList)
nodesList = updatedList
log.Info("Updating ES nodes list")
allEverKnownEsNodes = updateEverKnownNodes(allEverKnownEsNodes, updatedList)
esNodesList = updatedList

// Kibana
log.Debug("Starting updating Kibana nodes list")
kibanaUpdatedList, err := discoverNodesForService(kibanaConsulService)
if err != nil {
log.Error("Unable to update Kibana nodes, using last known state")
errorsCount.Inc()
continue
}

log.Info("Updating kibana nodes list")
allEverKnownKibanaNodes = updateEverKnownNodes(allEverKnownKibanaNodes, kibanaUpdatedList)
kibanaNodesList = kibanaUpdatedList

case <-executeProbingTicker.C:
log.Debug("Starting probing ES nodes")

sem := new(sync.WaitGroup)
for _, node := range nodesList {
for _, node := range esNodesList {
sem.Add(1)
go func(loopnode esnode) {
go func(esNode esnode) {
defer sem.Done()
probeNode(&loopnode, updateProbingPeriod)
probeElasticsearchNode(&esNode, updateProbingPeriod)
}(node)

}
sem.Wait()

log.Debug("Starting probing Kibana nodes")
for _, node := range esNodesList {
sem.Add(1)
go func(kibanaNode esnode) {
defer sem.Done()
probeKibanaNode(&kibanaNode, updateProbingPeriod)
}(node)

}
sem.Wait()
}
}
},
Expand Down

0 comments on commit d0bb4f0

Please sign in to comment.