Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: Allow for multiple filters
Browse files Browse the repository at this point in the history
The filter now subscribes to its inbound NATS subject using a queue
group so that multiple filters can be run at the same time.
  • Loading branch information
mjs committed May 1, 2018
1 parent 393a8c3 commit 543bb9b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ nats_subject_monitor = "influx-spout-monitor"

### Filter

The filter is responsible for filtering measurements published to NATS by the
listener, and forwarding them on to other NATS subjects.
The filter is responsible for filtering measurements published to NATS
by the listener, and forwarding them on to other NATS
subjects. Multiple filters may be run to distribute the filtering
workload across machines.

The supported configuration options for the filter mode follow. Defaults are
shown.
Expand Down
8 changes: 6 additions & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {
go w.run(jobs, f.stop, f.wg)
}

f.sub, err = f.nc.Subscribe(f.c.NATSSubject[0], func(msg *nats.Msg) {
// A fixed queue name is used to avoid a potential source of
// misconfiguration. Queue groups are tied to the subject being
// subscribed to and it's unlikely we'll want different queue
// groups for a single NATS subject.
f.sub, err = f.nc.QueueSubscribe(f.c.NATSSubject[0], "filter", func(msg *nats.Msg) {
if conf.Debug {
log.Printf("filter received %d bytes", len(msg.Data))
}
Expand Down Expand Up @@ -130,7 +134,7 @@ func initStats(rules *RuleSet) *stats.Stats {
// natsConn allows a mock nats.Conn to be substituted in during tests.
type natsConn interface {
Publish(string, []byte) error
Subscribe(string, nats.MsgHandler) (*nats.Subscription, error)
QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error)
Close()
}

Expand Down

0 comments on commit 543bb9b

Please sign in to comment.