Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: Use nats_pending_max_mb
Browse files Browse the repository at this point in the history
As for the writer, the filter now uses the `nats_pending_max_mb`
configuration option to set the maximum pending buffer size for the
incoming NATS subject. Previously it was using the default size of
65MB. Using a higher number may help to prevent dropped messages in
the filter. The default is 200MB.
  • Loading branch information
mjs committed Apr 24, 2018
1 parent f836098 commit 75a5c51
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 2 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ nats_subject_junkyard = "influx-spout-junk"
# by the monitor).
nats_subject_monitor = "influx-spout-monitor"

# The maximum size that the pending buffer for the NATS subject that the filter
# is reading from may become (in megabytes). Measurements will be dropped if
# this limit is reached.
nats_pending_max_mb = 200

# The number of filter workers to spawn.
workers = 8

Expand Down
3 changes: 3 additions & 0 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {
if err != nil {
return nil, fmt.Errorf("NATS: failed to subscribe: %v", err)
}
if err := f.sub.SetPendingLimits(-1, conf.NATSPendingMaxMB*1024*1024); err != nil {
return nil, fmt.Errorf("NATS: failed to set pending limits: %v", err)
}

f.wg.Add(1)
go f.startStatistician(stats, rules)
Expand Down
1 change: 1 addition & 0 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func testConfig() *config.Config {
NATSSubject: []string{"filter-test"},
NATSSubjectMonitor: "filter-test-monitor",
NATSSubjectJunkyard: "filter-junkyard",
NATSPendingMaxMB: 32,
Workers: 1,
MaxTimeDeltaSecs: 600,
Rule: []config.Rule{{
Expand Down
4 changes: 2 additions & 2 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
jobs <- msg
})
if err != nil {
return nil, fmt.Errorf("subscription for %q failed: %v", subject, err)
return nil, fmt.Errorf("NATS: subscription for %q failed: %v", subject, err)
}
if err := sub.SetPendingLimits(-1, maxPendingBytes); err != nil {
return nil, fmt.Errorf("failed to set pending limits: %v", err)
return nil, fmt.Errorf("NATS: failed to set pending limits: %v", err)
}

w.wg.Add(1)
Expand Down

0 comments on commit 75a5c51

Please sign in to comment.