Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
writer: Emit metrics in Prometheus format
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed Apr 10, 2018
1 parent 8f96283 commit 746055f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 45 deletions.
50 changes: 17 additions & 33 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import (

// Writer stats counters
const (
batchesReceived = "batches-received"
writeRequests = "write-requests"
failedWrites = "failed-writes"
statReceived = "received"
statWriteRequests = "write_requests"
statFailedWrites = "failed_writes"
)

type Writer struct {
Expand All @@ -64,7 +64,7 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
url: fmt.Sprintf("http://%s:%d/write?db=%s", c.InfluxDBAddress, c.InfluxDBPort, c.DBName),
batchMaxBytes: c.BatchMaxMB * 1024 * 1024,
batchMaxAge: time.Duration(c.BatchMaxSecs) * time.Second,
stats: stats.New(batchesReceived, writeRequests, failedWrites),
stats: stats.New(statReceived, statWriteRequests, statFailedWrites),
stop: make(chan struct{}),
}
defer func() {
Expand Down Expand Up @@ -160,7 +160,7 @@ func (w *Writer) worker(jobs <-chan *nats.Msg) {
for {
select {
case j := <-jobs:
w.stats.Inc(batchesReceived)
w.stats.Inc(statReceived)
batchWrite(j.Data)
case <-time.After(time.Second):
// Wake up regularly to check batch age
Expand All @@ -169,10 +169,10 @@ func (w *Writer) worker(jobs <-chan *nats.Msg) {
}

if w.shouldSendBatch(batch) {
w.stats.Inc(writeRequests)
w.stats.Inc(statWriteRequests)

if err := w.sendBatch(batch, client); err != nil {
w.stats.Inc(failedWrites)
w.stats.Inc(statFailedWrites)
log.Printf("Error: %v", err)
}

Expand Down Expand Up @@ -284,37 +284,21 @@ func (w *Writer) monitorSub(sub *nats.Subscription) {
}
}

// This goroutine is responsible for monitoring the statistics and
// sending it to the monitoring backend.
func (w *Writer) startStatistician() {
defer w.wg.Done()

// This goroutine is responsible for monitoring the statistics and
// sending it to the monitoring backend.
statsLine := lineformatter.New(
"spout_stat_writer",
[]string{ // tag keys
"writer",
"influxdb_address",
"influxdb_port",
"influxdb_dbname",
},
"received",
"write_requests",
"failed_writes",
)
tagVals := []string{
w.c.Name,
w.c.InfluxDBAddress,
strconv.Itoa(w.c.InfluxDBPort),
w.c.DBName,
labels := map[string]string{
"writer": w.c.Name,
"influxdb_address": w.c.InfluxDBAddress,
"influxdb_port": strconv.Itoa(w.c.InfluxDBPort),
"influxdb_dbname": w.c.DBName,
}

for {
stats := w.stats.Clone()
w.nc.Publish(w.c.NATSSubjectMonitor, statsLine.Format(
tagVals,
stats.Get(batchesReceived),
stats.Get(writeRequests),
stats.Get(failedWrites),
))
lines := stats.SnapshotToPrometheus(w.stats.Snapshot(), time.Now(), labels)
w.nc.Publish(w.c.NATSSubjectMonitor, lines)

select {
case <-time.After(3 * time.Second):
Expand Down
26 changes: 14 additions & 12 deletions writer/writer_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -71,9 +70,9 @@ func TestBasicWriter(t *testing.T) {
defer w.Stop()

// Subscribe to stats output.
statsCh := make(chan string, 10)
monitorCh := make(chan string, 10)
_, err := nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) {
statsCh <- string(msg.Data)
monitorCh <- string(msg.Data)
})
require.NoError(t, err)

Expand All @@ -95,15 +94,18 @@ func TestBasicWriter(t *testing.T) {
}
}

// Check the stats output.
spouttest.AssertRecvMulti(t, statsCh, "stats",
strings.Join([]string{
"spout_stat_writer",
"writer=foo",
"influxdb_address=localhost",
"influxdb_port=" + strconv.Itoa(influxPort),
"influxdb_dbname=metrics",
}, ",")+" received=5,write_requests=5,failed_writes=0\n")
// Check the monitor output.
labels := "{" + strings.Join([]string{
`influxdb_address="localhost"`,
`influxdb_dbname="metrics"`,
fmt.Sprintf(`influxdb_port="%d"`, influxPort),
`writer="foo"`,
}, ",") + "}"
spouttest.AssertMonitor(t, monitorCh, []string{
`received` + labels + ` 5`,
`write_requests` + labels + ` 5`,
`failed_writes` + labels + ` 0`,
})
}

func TestBatchMBLimit(t *testing.T) {
Expand Down

0 comments on commit 746055f

Please sign in to comment.