Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #74 from mjs/multiple-filters
Browse files Browse the repository at this point in the history
filter: Allow for multiple filters
  • Loading branch information
oplehto authored May 1, 2018
2 parents 2ba1aef + 543bb9b commit 9e95499
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ nats_subject_monitor = "influx-spout-monitor"

### Filter

The filter is responsible for filtering measurements published to NATS by the
listener, and forwarding them on to other NATS subjects.
The filter is responsible for filtering measurements published to NATS
by the listener, and forwarding them on to other NATS
subjects. Multiple filters may be run to distribute the filtering
workload across machines.

The supported configuration options for the filter mode follow. Defaults are
shown.
Expand Down
8 changes: 6 additions & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {
go w.run(jobs, f.stop, f.wg)
}

f.sub, err = f.nc.Subscribe(f.c.NATSSubject[0], func(msg *nats.Msg) {
// A fixed queue name is used to avoid a potential source of
// misconfiguration. Queue groups are tied to the subject being
// subscribed to and it's unlikely we'll want different queue
// groups for a single NATS subject.
f.sub, err = f.nc.QueueSubscribe(f.c.NATSSubject[0], "filter", func(msg *nats.Msg) {
if conf.Debug {
log.Printf("filter received %d bytes", len(msg.Data))
}
Expand Down Expand Up @@ -130,7 +134,7 @@ func initStats(rules *RuleSet) *stats.Stats {
// natsConn allows a mock nats.Conn to be substituted in during tests.
type natsConn interface {
Publish(string, []byte) error
Subscribe(string, nats.MsgHandler) (*nats.Subscription, error)
QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error)
Close()
}

Expand Down

0 comments on commit 9e95499

Please sign in to comment.