Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
writer: Use Prometheus metric for subscription drop stats
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed Apr 10, 2018
1 parent e037211 commit 56abc7f
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/filter"
"github.com/jumptrading/influx-spout/lineformatter"
"github.com/jumptrading/influx-spout/stats"
)

Expand Down Expand Up @@ -241,17 +240,15 @@ func (w *Writer) sendBatch(batch *batchBuffer, client *http.Client) error {
return nil
}

var dropLine = lineformatter.New("writer_drop", nil, "total", "diff")

func (w *Writer) signalDrop(drop, last int) {
func (w *Writer) signalDrop(subject string, drop, last int) {
// uh, this writer is overloaded and had to drop a packet
log.Printf("Warning: dropped %d (now %d dropped in total)\n", drop-last, drop)
log.Printf("Warning: dropped %d for subject %q (total dropped: %d)", drop-last, subject, drop)

// publish to the monitor subject, so grafana can pick it up and report failures
w.nc.Publish(w.c.NATSSubjectMonitor, dropLine.FormatT(time.Now(), nil, drop, drop-last))
labels := w.metricsLabels()
labels["subject"] = subject

// the fact the we dropped a packet MUST reach the server
// immediately so we can investigate
line := stats.CounterToPrometheus("dropped", drop, time.Now(), labels)
w.nc.Publish(w.c.NATSSubjectMonitor, line)
w.nc.Flush()
}

Expand All @@ -271,7 +268,7 @@ func (w *Writer) monitorSub(sub *nats.Subscription) {
}

if drop != last {
w.signalDrop(drop, last)
w.signalDrop(sub.Subject, drop, last)
}
last = drop

Expand All @@ -289,13 +286,7 @@ func (w *Writer) monitorSub(sub *nats.Subscription) {
func (w *Writer) startStatistician() {
defer w.wg.Done()

labels := map[string]string{
"writer": w.c.Name,
"influxdb_address": w.c.InfluxDBAddress,
"influxdb_port": strconv.Itoa(w.c.InfluxDBPort),
"influxdb_dbname": w.c.DBName,
}

labels := w.metricsLabels()
for {
lines := stats.SnapshotToPrometheus(w.stats.Snapshot(), time.Now(), labels)
w.nc.Publish(w.c.NATSSubjectMonitor, lines)
Expand All @@ -307,3 +298,12 @@ func (w *Writer) startStatistician() {
}
}
}

func (w *Writer) metricsLabels() map[string]string {
return map[string]string{
"writer": w.c.Name,
"influxdb_address": w.c.InfluxDBAddress,
"influxdb_port": strconv.Itoa(w.c.InfluxDBPort),
"influxdb_dbname": w.c.DBName,
}
}

0 comments on commit 56abc7f

Please sign in to comment.