Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
monitor: Expose k8s probe endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed May 7, 2018
1 parent 3fa8edd commit 55bf27b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
31 changes: 16 additions & 15 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@ import (
"net/http"
"sync"

"github.com/nats-io/go-nats"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/probes"
"github.com/jumptrading/influx-spout/prometheus"
"github.com/nats-io/go-nats"
)

// Start initialises, starts and returns a new Monitor instance based
// on the configuration supplies.
func Start(conf *config.Config) (_ *Monitor, err error) {
m := &Monitor{
c: conf,
ready: make(chan struct{}),
stop: make(chan struct{}),
metrics: prometheus.NewMetricSet(),
probes: probes.Listen(conf.ProbePort),
}
defer func() {
if err != nil {
Expand Down Expand Up @@ -66,25 +68,22 @@ func Start(conf *config.Config) (_ *Monitor, err error) {
// runtime statistics from the other influx-spout components and
// makes them available via a HTTP endpoint in Prometheus format.
type Monitor struct {
c *config.Config
nc *nats.Conn
sub *nats.Subscription
wg sync.WaitGroup
ready chan struct{}
stop chan struct{}
c *config.Config
nc *nats.Conn
sub *nats.Subscription
wg sync.WaitGroup
stop chan struct{}
probes probes.Probes

mu sync.Mutex
metrics *prometheus.MetricSet
}

// Ready returns a channel which is closed once the monitor is
// actually listening for HTTP metrics requests.
func (m *Monitor) Ready() <-chan struct{} {
return m.ready
}

// Stop shuts down goroutines and closes resources related to the filter.
func (m *Monitor) Stop() {
m.probes.SetReady(false)
m.probes.SetAlive(false)

// Stop receiving lines from NATS.
m.sub.Unsubscribe()

Expand All @@ -96,6 +95,8 @@ func (m *Monitor) Stop() {
if m.nc != nil {
m.nc.Close()
}

m.probes.Close()
}

func (m *Monitor) natsConnect() (*nats.Conn, error) {
Expand Down Expand Up @@ -125,7 +126,7 @@ func (m *Monitor) serveHTTP() {
}

go func() {
close(m.ready)
m.probes.SetReady(true)
err := server.ListenAndServe()
if err == nil || err == http.ErrServerClosed {
return
Expand Down
9 changes: 3 additions & 6 deletions monitor/monitor_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

const natsPort = 44447
const httpPort = 44448
const probePort = 44449

var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)

Expand All @@ -43,6 +44,7 @@ func testConfig() *config.Config {
NATSAddress: natsAddress,
NATSSubjectMonitor: "monitor-test-monitor",
Port: httpPort,
ProbePort: probePort,
}
}

Expand All @@ -55,12 +57,7 @@ func TestMonitor(t *testing.T) {
mon, err := monitor.Start(conf)
require.NoError(t, err)
defer mon.Stop()

select {
case <-mon.Ready():
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for monitor to be ready")
}
spouttest.AssertReadyProbe(t, conf.ProbePort)

publish := func(data []byte) {
err := nc.Publish(conf.NATSSubjectMonitor, data)
Expand Down

0 comments on commit 55bf27b

Please sign in to comment.