Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

filter: Allow for multiple filters #74

Merged
merged 1 commit into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ nats_subject_monitor = "influx-spout-monitor"

### Filter

The filter is responsible for filtering measurements published to NATS by the
listener, and forwarding them on to other NATS subjects.
The filter is responsible for filtering measurements published to NATS
by the listener, and forwarding them on to other NATS
subjects. Multiple filters may be run to distribute the filtering
workload across machines.

The supported configuration options for the filter mode follow. Defaults are
shown.
Expand Down
8 changes: 6 additions & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {
go w.run(jobs, f.stop, f.wg)
}

f.sub, err = f.nc.Subscribe(f.c.NATSSubject[0], func(msg *nats.Msg) {
// A fixed queue name is used to avoid a potential source of
// misconfiguration. Queue groups are tied to the subject being
// subscribed to and it's unlikely we'll want different queue
// groups for a single NATS subject.
f.sub, err = f.nc.QueueSubscribe(f.c.NATSSubject[0], "filter", func(msg *nats.Msg) {
if conf.Debug {
log.Printf("filter received %d bytes", len(msg.Data))
}
Expand Down Expand Up @@ -130,7 +134,7 @@ func initStats(rules *RuleSet) *stats.Stats {
// natsConn allows a mock nats.Conn to be substituted in during tests.
type natsConn interface {
Publish(string, []byte) error
Subscribe(string, nats.MsgHandler) (*nats.Subscription, error)
QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error)
Close()
}

Expand Down