Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
writer: Expose k8s probe endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed May 7, 2018
1 parent de9b218 commit 3fa8edd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
10 changes: 10 additions & 0 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/filter"
"github.com/jumptrading/influx-spout/probes"
"github.com/jumptrading/influx-spout/stats"
)

Expand All @@ -54,6 +55,7 @@ type Writer struct {
rules *filter.RuleSet
stats *stats.Stats
wg sync.WaitGroup
probes probes.Probes
stop chan struct{}
}

Expand All @@ -66,6 +68,7 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
batchMaxBytes: c.BatchMaxMB * 1024 * 1024,
batchMaxAge: time.Duration(c.BatchMaxSecs) * time.Second,
stats: stats.New(statReceived, statWriteRequests, statFailedWrites, statMaxPending),
probes: probes.Listen(c.ProbePort),
stop: make(chan struct{}),
}
defer func() {
Expand Down Expand Up @@ -124,18 +127,25 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
log.Printf("POST timeout: %ds", c.WriteTimeoutSecs)
log.Printf("maximum NATS subject size: %dMB", c.NATSPendingMaxMB)

w.probes.SetReady(true)

return w, nil
}

// Stop aborts all goroutines belonging to the Writer and closes its
// connection to NATS. It will be block until all Writer goroutines
// have stopped.
func (w *Writer) Stop() {
w.probes.SetReady(false)
w.probes.SetAlive(false)

close(w.stop)
w.wg.Wait()
if w.nc != nil {
w.nc.Close()
}

w.probes.Close()
}

func (w *Writer) worker(jobs <-chan *nats.Msg) {
Expand Down
9 changes: 8 additions & 1 deletion writer/writer_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
"github.com/jumptrading/influx-spout/spouttest"
)

const influxPort = 44445
const natsPort = 44443
const influxPort = 44445
const probePort = 44446

var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)

Expand All @@ -54,6 +55,7 @@ func testConfig() *config.Config {
Port: influxPort,
Workers: 96,
NATSPendingMaxMB: 32,
ProbePort: probePort,
}
}

Expand Down Expand Up @@ -302,6 +304,11 @@ func runGnatsd(t FatalTestingT) (*nats.Conn, func()) {
func startWriter(t require.TestingT, conf *config.Config) *Writer {
w, err := StartWriter(conf)
require.NoError(t, err)
if !spouttest.CheckReadyProbe(conf.ProbePort) {
w.Stop()
t.Errorf("writer not ready")
t.FailNow()
}
return w
}

Expand Down

0 comments on commit 3fa8edd

Please sign in to comment.