From 3fa8eddc41b0ce45d521b9fb37132b47628784da Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 11:36:03 +1200 Subject: [PATCH] writer: Expose k8s probe endpoints --- writer/writer.go | 10 ++++++++++ writer/writer_medium_test.go | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/writer/writer.go b/writer/writer.go index 4a18539..e345211 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -33,6 +33,7 @@ import ( "github.com/jumptrading/influx-spout/config" "github.com/jumptrading/influx-spout/filter" + "github.com/jumptrading/influx-spout/probes" "github.com/jumptrading/influx-spout/stats" ) @@ -54,6 +55,7 @@ type Writer struct { rules *filter.RuleSet stats *stats.Stats wg sync.WaitGroup + probes probes.Probes stop chan struct{} } @@ -66,6 +68,7 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { batchMaxBytes: c.BatchMaxMB * 1024 * 1024, batchMaxAge: time.Duration(c.BatchMaxSecs) * time.Second, stats: stats.New(statReceived, statWriteRequests, statFailedWrites, statMaxPending), + probes: probes.Listen(c.ProbePort), stop: make(chan struct{}), } defer func() { @@ -124,6 +127,8 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { log.Printf("POST timeout: %ds", c.WriteTimeoutSecs) log.Printf("maximum NATS subject size: %dMB", c.NATSPendingMaxMB) + w.probes.SetReady(true) + return w, nil } @@ -131,11 +136,16 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { // connection to NATS. It will be block until all Writer goroutines // have stopped. func (w *Writer) Stop() { + w.probes.SetReady(false) + w.probes.SetAlive(false) + close(w.stop) w.wg.Wait() if w.nc != nil { w.nc.Close() } + + w.probes.Close() } func (w *Writer) worker(jobs <-chan *nats.Msg) { diff --git a/writer/writer_medium_test.go b/writer/writer_medium_test.go index ee242a4..75149a5 100644 --- a/writer/writer_medium_test.go +++ b/writer/writer_medium_test.go @@ -33,8 +33,9 @@ import ( "github.com/jumptrading/influx-spout/spouttest" ) -const influxPort = 44445 const natsPort = 44443 +const influxPort = 44445 +const probePort = 44446 var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) @@ -54,6 +55,7 @@ func testConfig() *config.Config { Port: influxPort, Workers: 96, NATSPendingMaxMB: 32, + ProbePort: probePort, } } @@ -302,6 +304,11 @@ func runGnatsd(t FatalTestingT) (*nats.Conn, func()) { func startWriter(t require.TestingT, conf *config.Config) *Writer { w, err := StartWriter(conf) require.NoError(t, err) + if !spouttest.CheckReadyProbe(conf.ProbePort) { + w.Stop() + t.Errorf("writer not ready") + t.FailNow() + } return w }