This repository has been archived by the owner on Mar 9, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #57 from mjs/monitor-component
Introduce monitor component
- Loading branch information
Showing
8 changed files
with
567 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
// Copyright 2018 Jump Trading | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package monitor defines the influx-spount monitor component. | ||
package monitor | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"net/http" | ||
"sync" | ||
|
||
"github.com/jumptrading/influx-spout/config" | ||
"github.com/jumptrading/influx-spout/prometheus" | ||
"github.com/nats-io/go-nats" | ||
) | ||
|
||
// Start initialises, starts and returns a new Monitor instance based | ||
// on the configuration supplies. | ||
func Start(conf *config.Config) (_ *Monitor, err error) { | ||
m := &Monitor{ | ||
c: conf, | ||
ready: make(chan struct{}), | ||
stop: make(chan struct{}), | ||
metrics: prometheus.NewMetricSet(), | ||
} | ||
defer func() { | ||
if err != nil { | ||
m.Stop() | ||
} | ||
}() | ||
|
||
m.nc, err = m.natsConnect() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m.sub, err = m.nc.Subscribe(m.c.NATSSubjectMonitor, m.receiveMetrics) | ||
if err != nil { | ||
return nil, fmt.Errorf("NATS: failed to subscribe: %v", err) | ||
} | ||
|
||
m.wg.Add(1) | ||
go m.serveHTTP() | ||
|
||
log.Printf("monitor subscribed to [%s] at %s - serving HTTP on port %d", | ||
m.c.NATSSubjectMonitor, m.c.NATSAddress, m.c.Port) | ||
return m, nil | ||
} | ||
|
||
// Monitor defines an influx-spout component which accumulates | ||
// runtime statistics from the other influx-spout components and | ||
// makes them available via a HTTP endpoint in Prometheus format. | ||
type Monitor struct { | ||
c *config.Config | ||
nc *nats.Conn | ||
sub *nats.Subscription | ||
wg sync.WaitGroup | ||
ready chan struct{} | ||
stop chan struct{} | ||
|
||
mu sync.Mutex | ||
metrics *prometheus.MetricSet | ||
} | ||
|
||
// Ready returns a channel which is closed once the monitor is | ||
// actually listening for HTTP metrics requests. | ||
func (m *Monitor) Ready() <-chan struct{} { | ||
return m.ready | ||
} | ||
|
||
// Stop shuts down goroutines and closes resources related to the filter. | ||
func (m *Monitor) Stop() { | ||
// Stop receiving lines from NATS. | ||
m.sub.Unsubscribe() | ||
|
||
// Shut down goroutines. | ||
close(m.stop) | ||
m.wg.Wait() | ||
|
||
// Close the connection to NATS. | ||
if m.nc != nil { | ||
m.nc.Close() | ||
} | ||
} | ||
|
||
func (m *Monitor) natsConnect() (*nats.Conn, error) { | ||
nc, err := nats.Connect(m.c.NATSAddress) | ||
if err != nil { | ||
return nil, fmt.Errorf("NATS: failed to connect: %v", err) | ||
} | ||
return nc, nil | ||
} | ||
|
||
func (m *Monitor) serveHTTP() { | ||
defer m.wg.Done() | ||
|
||
mux := http.NewServeMux() | ||
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { | ||
r.Body.Close() | ||
w.Header().Set("Content-Type", "text/plain") | ||
|
||
defer m.mu.Unlock() | ||
m.mu.Lock() | ||
w.Write(m.metrics.ToBytes()) | ||
}) | ||
|
||
server := &http.Server{ | ||
Addr: fmt.Sprintf(":%d", m.c.Port), | ||
Handler: mux, | ||
} | ||
|
||
go func() { | ||
close(m.ready) | ||
err := server.ListenAndServe() | ||
if err == nil || err == http.ErrServerClosed { | ||
return | ||
} | ||
log.Fatal(err) | ||
}() | ||
|
||
// Close the server if the stop channel is closed. | ||
<-m.stop | ||
server.Close() | ||
} | ||
|
||
func (m *Monitor) receiveMetrics(msg *nats.Msg) { | ||
newMetrics, err := prometheus.ParseMetrics(msg.Data) | ||
if err != nil { | ||
log.Printf("invalid metrics received: %v", err) | ||
return | ||
} | ||
|
||
defer m.mu.Unlock() | ||
m.mu.Lock() | ||
m.metrics.UpdateFromSet(newMetrics) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
// Copyright 2018 Jump Trading | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// +build medium | ||
|
||
package monitor_test | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"testing" | ||
"time" | ||
|
||
"github.com/nats-io/go-nats" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/jumptrading/influx-spout/config" | ||
"github.com/jumptrading/influx-spout/monitor" | ||
"github.com/jumptrading/influx-spout/prometheus" | ||
"github.com/jumptrading/influx-spout/spouttest" | ||
) | ||
|
||
const natsPort = 44447 | ||
const httpPort = 44448 | ||
|
||
var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) | ||
|
||
func testConfig() *config.Config { | ||
return &config.Config{ | ||
Name: "nats.server", | ||
NATSAddress: natsAddress, | ||
NATSSubjectMonitor: "monitor-test-monitor", | ||
Port: httpPort, | ||
} | ||
} | ||
|
||
func TestMonitor(t *testing.T) { | ||
nc, stopNats := runGnatsd(t) | ||
defer stopNats() | ||
|
||
conf := testConfig() | ||
|
||
mon, err := monitor.Start(conf) | ||
require.NoError(t, err) | ||
defer mon.Stop() | ||
|
||
select { | ||
case <-mon.Ready(): | ||
case <-time.After(spouttest.LongWait): | ||
t.Fatal("timed out waiting for monitor to be ready") | ||
} | ||
|
||
publish := func(data []byte) { | ||
err := nc.Publish(conf.NATSSubjectMonitor, data) | ||
require.NoError(t, err) | ||
} | ||
|
||
expected := prometheus.NewMetricSet() | ||
|
||
// Send a metric to the monitor and see it included at the | ||
// monitor's metric endpoint. | ||
m0 := &prometheus.Metric{ | ||
Name: []byte("foo"), | ||
Labels: prometheus.LabelPairs{ | ||
{ | ||
Name: []byte("host"), | ||
Value: []byte("nyc01"), | ||
}, | ||
{ | ||
Name: []byte("land"), | ||
Value: []byte("ho"), | ||
}, | ||
}, | ||
Value: 42, | ||
Milliseconds: 11111111, | ||
} | ||
publish(m0.ToBytes()) | ||
expected.Update(m0) | ||
|
||
assertMetrics(t, expected) | ||
|
||
// Send another update with 2 metrics to the monitor and see them | ||
// included. | ||
nextUpdate := prometheus.NewMetricSet() | ||
nextUpdate.Update(&prometheus.Metric{ | ||
Name: []byte("foo"), | ||
Labels: prometheus.LabelPairs{ | ||
{ | ||
Name: []byte("host"), | ||
Value: []byte("nyc01"), | ||
}, | ||
{ | ||
Name: []byte("land"), | ||
Value: []byte("ho"), | ||
}, | ||
}, | ||
Value: 99, | ||
Milliseconds: 22222222, | ||
}) | ||
nextUpdate.Update(&prometheus.Metric{ | ||
Name: []byte("bar"), | ||
Labels: prometheus.LabelPairs{ | ||
{ | ||
Name: []byte("host"), | ||
Value: []byte("nyc02"), | ||
}, | ||
}, | ||
Value: 1024, | ||
Milliseconds: 33333333, | ||
}) | ||
publish(nextUpdate.ToBytes()) | ||
expected.UpdateFromSet(nextUpdate) | ||
|
||
assertMetrics(t, expected) | ||
} | ||
|
||
func runGnatsd(t *testing.T) (*nats.Conn, func()) { | ||
gnatsd := spouttest.RunGnatsd(natsPort) | ||
|
||
nc, err := nats.Connect(natsAddress) | ||
if err != nil { | ||
gnatsd.Shutdown() | ||
t.Fatalf("NATS connect failed: %v", err) | ||
} | ||
|
||
return nc, func() { | ||
nc.Close() | ||
gnatsd.Shutdown() | ||
} | ||
} | ||
|
||
func assertMetrics(t *testing.T, expected *prometheus.MetricSet) { | ||
var actual *prometheus.MetricSet | ||
|
||
for try := 0; try < 10; try++ { | ||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", httpPort)) | ||
require.NoError(t, err) | ||
defer resp.Body.Close() | ||
|
||
require.Equal(t, resp.StatusCode, 200) | ||
require.Equal(t, "text/plain", resp.Header.Get("Content-Type")) | ||
|
||
body, err := ioutil.ReadAll(resp.Body) | ||
require.NoError(t, err) | ||
|
||
actual, err = prometheus.ParseMetrics(body) | ||
require.NoError(t, err) | ||
|
||
if string(expected.ToBytes()) == string(actual.ToBytes()) { | ||
return // Success | ||
} | ||
|
||
// Metrics may not have been processed yet - sleep and try again. | ||
time.Sleep(250 * time.Millisecond) | ||
} | ||
|
||
t.Fatalf("Failed to see expected metrics. Wanted:\n%s\nLast saw:\n%s", expected.ToBytes(), actual.ToBytes()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.