Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Use monitor component #64

Merged
merged 16 commits into from
Apr 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@

## Overview

influx-spout is an open source messaging system that routes and processes
[InfluxDB line protocol] metrics from agents (for example [Telegraf]) to
processing and storage backends ([InfluxDB], [Kapacitor] etc.).
influx-spout is an open source messaging system that routes and processes
[InfluxDB line protocol] metrics from agents (for example [Telegraf]) to
processing and storage backends ([InfluxDB], [Kapacitor] etc.).

Key features:
- Proven ability to handle high volumes of data (>500k points per second)

- Proven ability to handle high volumes of data (>500k points per second)
in a production environment
- Horizontal scalability
- Multithreaded data processing within components
- Components can be distributed on multiple servers or a fleet of containers
* Multithreaded data processing within components
* Components can be distributed on multiple servers or a fleet of containers
- Ability to add and remove endpoints without disrupting existing data flows
- Fine-grained, regexp-based control over routing of metrics data to specific
- Fine-grained, regexp-based control over routing of metrics data to specific
backends
- Sanity checking of the data stream that prevents corrupt metrics data from
- Sanity checking of the data stream that prevents corrupt metrics data from
reaching backends
- Batching of outgoing data to larger chunks, making it easier for backends
- Batching of outgoing data to larger chunks, making it easier for backends
to handle high-volume dataflows
- Leverages the high-performance [NATS] messaging system

Expand All @@ -43,23 +44,23 @@ flow of data between the various components:
+-----------------+ +-----------------+
| |
v v
+----------+ +-------------------------------------------------+
| |<--+ |
| Filter | | NATS |
| +-->| |
+----------+ +--------+----------------+----------------+------+
| | |
v v v
+----------+ +----------+ +----------+
| | | | | |
| Writer | | Writer | | Writer |
| | | | | |
+-----+----+ +-----+----+ +-----+----+
| | |
v v v
+----------+ +----------+ +----------+
| InfluxDB | | InfluxDB | | InfluxDB |
+----------+ +----------+ +----------+
+----------+ +-------------------------------------------------+ +---------+
| |<--+ | | |
| Filter | | NATS +-->| Monitor |
| +-->| | | |
+----------+ +--------+----------------+----------------+------+ +----+----+
| | | |
v v v |
+----------+ +----------+ +----------+ |
| | | | | | |
| Writer | | Writer | | Writer | |
| | | | | | |
+-----+----+ +-----+----+ +-----+----+ |
| | | |
v v v V
+----------+ +----------+ +----------+ +----------------+
| InfluxDB | | InfluxDB | | InfluxDB | | Metrics (HTTP) |
+----------+ +----------+ +----------+ +----------------+
```

All the influx-spout components may be run on a single host or may be
Expand Down Expand Up @@ -131,8 +132,8 @@ batch = 10
# support higher receive rates.
read_buffer_bytes = 4194304

# Out-of-bound metrics and diagnostic messages are published to this NATS subject
# (in InfluxDB line protocol format).
# The listener will publish its own metrics to this NATS subject (for consumption
# by the monitor).
nats_subject_monitor = "influx-spout-monitor"
```

Expand Down Expand Up @@ -175,8 +176,8 @@ read_buffer_bytes = 4194304
# defaults to 1 MB).
listener_batch_bytes = 1048576

# Out-of-bound metrics and diagnostic messages are published to this NATS subject
# (in InfluxDB line protocol format).
# The HTTP listener will publish its own metrics to this NATS subject (for
# consumption by the monitor).
nats_subject_monitor = "influx-spout-monitor"
```

Expand All @@ -201,8 +202,8 @@ nats_subject = ["influx-spout"]
# Measurements which do not match any rule (below) are sent to this NATS subject.
nats_subject_junkyard = "influx-spout-junk"

# Out-of-bound metrics and diagnostic messages are published to this NATS subject
# (in InfluxDB line protocol format).
# The filter will publish its own metrics to this NATS subject (for consumption
# by the monitor).
nats_subject_monitor = "influx-spout-monitor"

# The number of filter workers to spawn.
Expand Down Expand Up @@ -313,8 +314,8 @@ write_timeout_secs = 30
# this limit is reached. This helps to deal with slow InfluxDB instances.
nats_pending_max_mb = 200

# Out-of-bound metrics and diagnostic messages are published to this NATS subject
# (in InfluxDB line protocol format).
# The writer will publish its own metrics to this NATS subject (for consumption
# by the monitor).
nats_subject_monitor = "influx-spout-monitor"
```

Expand All @@ -326,6 +327,30 @@ measurements which don't match a rule will be dropped by the writer instead of
being written to InfluxDB. Rule configuration is the same as for the filter
component, but the rule subject should be omitted.

### Monitor

The monitor is responsible for collecting metrics from the other
influx-spout components and serving then over HTTP in [Prometheus data
exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Metrics
are available at `/metrics` on the configured port.

The supported configuration options for the writer mode follow. Defaults are
shown.

```toml
mode = "monitor" # Required

# Address of NATS server.
nats_address = "nats://localhost:4222"

# The NATS subject used by influx-spout components to report metrics to the monitor.
# The monitor will consume and aggregate metrics sent to this subject.
nats_subject_monitor = "influx-spout-monitor"

# The TCP port where the monitor will serve Prometheus formatted metrics over HTTP.
port = 9331
```

## Running tests

influx-spout's tests are classified as either "small", "medium" or "large",
Expand Down
15 changes: 11 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,18 @@ func NewConfigFromFile(fileName string) (*Config, error) {
if conf.Name == "" {
conf.Name = pathToConfigName(fileName)
}
if conf.Mode == "listener" && conf.Port == 0 {
conf.Port = 10001
} else if conf.Mode == "listener_http" && conf.Port == 0 {
conf.Port = 13337

if conf.Port == 0 {
switch conf.Mode {
case "listener":
conf.Port = 10001
case "listener_http":
conf.Port = 13337
case "monitor":
conf.Port = 9331
}
}

return conf, nil
}

Expand Down
6 changes: 6 additions & 0 deletions config/config_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func TestDefaultPortHTTPListener(t *testing.T) {
assert.Equal(t, 13337, conf.Port)
}

func TestDefaultPortMonitor(t *testing.T) {
conf, err := parseConfig(`mode = "monitor"`)
require.NoError(t, err)
assert.Equal(t, 9331, conf.Port)
}

func TestNoMode(t *testing.T) {
_, err := parseConfig("")
assert.EqualError(t, err, "mode not specified in config")
Expand Down
98 changes: 61 additions & 37 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ package filter
import (
"fmt"
"log"
"strconv"
"sort"
"strings"
"sync"
"time"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/lineformatter"
"github.com/jumptrading/influx-spout/stats"
"github.com/nats-io/go-nats"
)

// Name for supported stats
const (
linesPassed = "passed"
linesProcessed = "processed"
linesRejected = "rejected"
linesInvalidTime = "invalid-time"
statPassed = "passed"
statProcessed = "processed"
statRejected = "rejected"
statInvalidTime = "invalid_time"
)

// StartFilter creates a Filter instance, sets up its rules based on
Expand Down Expand Up @@ -109,10 +109,10 @@ func (f *Filter) natsConnect() (natsConn, error) {
func initStats(rules *RuleSet) *stats.Stats {
// Initialise
statNames := []string{
linesPassed,
linesProcessed,
linesRejected,
linesInvalidTime,
statPassed,
statProcessed,
statRejected,
statInvalidTime,
}
for i := 0; i < rules.Count(); i++ {
statNames = append(statNames, ruleToStatsName(i))
Expand Down Expand Up @@ -155,40 +155,34 @@ func (f *Filter) Stop() {
// startStatistician defines a goroutine that is responsible for
// regularly sending the filter's statistics to the monitoring
// backend.
func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) {
defer f.wg.Done()

totalLine := lineformatter.New(
"spout_stat_filter",
[]string{"filter"},
linesPassed, linesProcessed, linesRejected, linesInvalidTime,
)
ruleLine := lineformatter.New(
"spout_stat_filter_rule",
[]string{"filter", "rule"},
"triggered",
)
generalLabels := map[string]string{
"component": "filter",
"name": f.c.Name,
}

for {
st := stats.Clone()
now := time.Now()
snap, ruleCounts := splitSnapshot(st.Snapshot())

// publish the grand stats
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(
[]string{f.c.Name},
st.Get(linesPassed),
st.Get(linesProcessed),
st.Get(linesRejected),
st.Get(linesInvalidTime),
))
// publish the general stats
lines := stats.SnapshotToPrometheus(snap, now, generalLabels)
f.nc.Publish(f.c.NATSSubjectMonitor, lines)

// publish the per rule stats
for i, subject := range rules.Subjects() {
f.nc.Publish(f.c.NATSSubjectMonitor,
ruleLine.Format(
[]string{f.c.Name, subject},
st.Get(ruleToStatsName(i)),
),
)
f.nc.Publish(f.c.NATSSubjectMonitor, stats.CounterToPrometheus(
"triggered",
ruleCounts[i],
now,
map[string]string{
"component": "filter",
"name": f.c.Name,
"rule": subject,
},
))
}

select {
Expand All @@ -199,8 +193,38 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
}
}

const rulePrefix = "rule-"

// ruleToStatsName converts a rule index to a name to a key for use
// with a stats.Stats instance.
func ruleToStatsName(i int) string {
return "rule" + strconv.Itoa(i)
return fmt.Sprintf("%s%06d", rulePrefix, i)
}

// splitSnapshot takes a Snapshot and splits out the rule counters
// from the others. The rule counters are returned in an ordered slice
// while the other counters are returned as a new (smaller) Snapshot.
func splitSnapshot(snap stats.Snapshot) (stats.Snapshot, []int) {
var genSnap stats.Snapshot
var ruleSnap stats.Snapshot

// Split up rule counters from the others.
for _, counter := range snap {
if strings.HasPrefix(counter.Name, rulePrefix) {
ruleSnap = append(ruleSnap, counter)
} else {
genSnap = append(genSnap, counter)
}
}

// Sort the rule counters by name and extract just the counts.
sort.Slice(ruleSnap, func(i, j int) bool {
return ruleSnap[i].Name < ruleSnap[j].Name
})
ruleCounts := make([]int, len(ruleSnap))
for i, counter := range ruleSnap {
ruleCounts[i] = counter.Value
}

return genSnap, ruleCounts
}
Loading