Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: Expose k8s probe endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed May 7, 2018
1 parent b473079 commit a536c49
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
26 changes: 18 additions & 8 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/probes"
"github.com/jumptrading/influx-spout/stats"
"github.com/nats-io/go-nats"
)
Expand All @@ -43,9 +44,10 @@ const (
// NATS topic.
func StartFilter(conf *config.Config) (_ *Filter, err error) {
f := &Filter{
c: conf,
stop: make(chan struct{}),
wg: new(sync.WaitGroup),
c: conf,
stop: make(chan struct{}),
wg: new(sync.WaitGroup),
probes: probes.Listen(conf.ProbePort),
}
defer func() {
if err != nil {
Expand Down Expand Up @@ -104,6 +106,8 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {

log.Printf("filter subscribed to [%s] at %s with %d rules\n",
f.c.NATSSubject[0], f.c.NATSAddress, rules.Count())

f.probes.SetReady(true)
return f, nil
}

Expand Down Expand Up @@ -141,15 +145,19 @@ type natsConn interface {
// Filter is a struct that contains the configuration we are running with
// and the NATS bus connection
type Filter struct {
c *config.Config
nc natsConn
sub *nats.Subscription
wg *sync.WaitGroup
stop chan struct{}
c *config.Config
nc natsConn
sub *nats.Subscription
wg *sync.WaitGroup
probes probes.Probes
stop chan struct{}
}

// Stop shuts down goroutines and closes resources related to the filter.
func (f *Filter) Stop() {
f.probes.SetReady(false)
f.probes.SetAlive(false)

// Stop receiving lines to filter.
f.sub.Unsubscribe()

Expand All @@ -161,6 +169,8 @@ func (f *Filter) Stop() {
if f.nc != nil {
f.nc.Close()
}

f.probes.Close()
}

// startStatistician defines a goroutine that is responsible for
Expand Down
15 changes: 11 additions & 4 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
)

const natsPort = 44446
const probePort = 44447

func testConfig() *config.Config {
return &config.Config{
Expand All @@ -46,6 +47,7 @@ func testConfig() *config.Config {
Match: "hello",
Subject: "hello-subject",
}},
ProbePort: probePort,
}
}

Expand All @@ -55,8 +57,7 @@ func TestFilterWorker(t *testing.T) {

conf := testConfig()

filter, err := StartFilter(conf)
require.NoError(t, err)
filter := startFilter(t, conf)
defer filter.Stop()

nc, err := nats.Connect(conf.NATSAddress)
Expand Down Expand Up @@ -123,8 +124,7 @@ func TestInvalidTimeStamps(t *testing.T) {
conf := testConfig()
conf.MaxTimeDeltaSecs = 10

filter, err := StartFilter(conf)
require.NoError(t, err)
filter := startFilter(t, conf)
defer filter.Stop()

nc, err := nats.Connect(conf.NATSAddress)
Expand Down Expand Up @@ -174,3 +174,10 @@ func TestInvalidTimeStamps(t *testing.T) {
`triggered{component="filter",name="particle",rule="hello-subject"} 2`,
})
}

func startFilter(t *testing.T, conf *config.Config) *Filter {
filter, err := StartFilter(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
return filter
}

0 comments on commit a536c49

Please sign in to comment.