Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: New "failed_nats_publish" metric
Browse files Browse the repository at this point in the history
Previously, errors while publishing to NATS were being silently
ignored. This now increments the new metric and generates logs if
Debug is enabled.
  • Loading branch information
mjs committed Apr 24, 2018
1 parent 4ae2fed commit 9be52aa
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
12 changes: 7 additions & 5 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import (

// Name for supported stats
const (
statPassed = "passed"
statProcessed = "processed"
statRejected = "rejected"
statInvalidTime = "invalid_time"
statNATSDropped = "nats_dropped"
statPassed = "passed"
statProcessed = "processed"
statRejected = "rejected"
statInvalidTime = "invalid_time"
statFailedNATSPublish = "failed_nats_publish"
statNATSDropped = "nats_dropped"
)

// StartFilter creates a Filter instance, sets up its rules based on
Expand Down Expand Up @@ -114,6 +115,7 @@ func initStats(rules *RuleSet) *stats.Stats {
statProcessed,
statRejected,
statInvalidTime,
statFailedNATSPublish,
statNATSDropped,
}
for i := 0; i < rules.Count(); i++ {
Expand Down
2 changes: 2 additions & 0 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ goodbye,host=gopher01
`processed{component="filter",name="particle"} 3`,
`rejected{component="filter",name="particle"} 1`,
`invalid_time{component="filter",name="particle"} 0`,
`failed_nats_publish{component="filter",name="particle"} 0`,
`nats_dropped{component="filter",name="particle"} 0`,
`triggered{component="filter",name="particle",rule="hello-subject"} 2`,
})
Expand Down Expand Up @@ -167,6 +168,7 @@ func TestInvalidTimeStamps(t *testing.T) {
`processed{component="filter",name="particle"} 4`,
`rejected{component="filter",name="particle"} 0`,
`invalid_time{component="filter",name="particle"} 2`,
`failed_nats_publish{component="filter",name="particle"} 0`,
`nats_dropped{component="filter",name="particle"} 0`,
`triggered{component="filter",name="particle",rule="hello-subject"} 2`,
})
Expand Down
14 changes: 12 additions & 2 deletions filter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,28 @@ func (w *worker) sendOff() {
for i, subject := range w.rules.Subjects() {
batch := w.batches[i]
if batch.Len() > 0 {
w.nc.Publish(subject, batch.Bytes())
w.publish(subject, batch.Bytes())
batch.Reset()
}
}

// send the junk batch
if w.junkBatch.Len() > 0 {
w.nc.Publish(w.junkSubject, w.junkBatch.Bytes())
w.publish(w.junkSubject, w.junkBatch.Bytes())
w.junkBatch.Reset()
}
}

func (w *worker) publish(subject string, data []byte) {
err := w.nc.Publish(subject, data)
if err != nil {
w.stats.Inc(statFailedNATSPublish)
if w.debug {
log.Printf("NATS publish failed: %v", err)
}
}
}

// Any realistic timestamp will be 18 or 19 characters long.
const minTsLen = 18
const maxTsLen = 19
Expand Down
1 change: 1 addition & 0 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func TestEndToEnd(t *testing.T) {

// Check metrics published by monitor component.
expectedMetrics := regexp.MustCompile(`
failed_nats_publish{component="filter",name="filter"} 0
failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0
invalid_time{component="filter",name="filter"} 0
max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+
Expand Down

0 comments on commit 9be52aa

Please sign in to comment.