From 543bb9b8d81c263a7208c66df76ca26738a022d3 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 1 May 2018 12:25:15 +1200 Subject: [PATCH] filter: Allow for multiple filters The filter now subscribes to its inbound NATS subject using a queue group so that multiple filters can be run at the same time. --- README.md | 6 ++++-- filter/filter.go | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3e4e6c7..2bc56f7 100644 --- a/README.md +++ b/README.md @@ -183,8 +183,10 @@ nats_subject_monitor = "influx-spout-monitor" ### Filter -The filter is responsible for filtering measurements published to NATS by the -listener, and forwarding them on to other NATS subjects. +The filter is responsible for filtering measurements published to NATS +by the listener, and forwarding them on to other NATS +subjects. Multiple filters may be run to distribute the filtering +workload across machines. The supported configuration options for the filter mode follow. Defaults are shown. diff --git a/filter/filter.go b/filter/filter.go index c2e80e4..0a4adc4 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -82,7 +82,11 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) { go w.run(jobs, f.stop, f.wg) } - f.sub, err = f.nc.Subscribe(f.c.NATSSubject[0], func(msg *nats.Msg) { + // A fixed queue name is used to avoid a potential source of + // misconfiguration. Queue groups are tied to the subject being + // subscribed to and it's unlikely we'll want different queue + // groups for a single NATS subject. + f.sub, err = f.nc.QueueSubscribe(f.c.NATSSubject[0], "filter", func(msg *nats.Msg) { if conf.Debug { log.Printf("filter received %d bytes", len(msg.Data)) } @@ -130,7 +134,7 @@ func initStats(rules *RuleSet) *stats.Stats { // natsConn allows a mock nats.Conn to be substituted in during tests. type natsConn interface { Publish(string, []byte) error - Subscribe(string, nats.MsgHandler) (*nats.Subscription, error) + QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error) Close() }