Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
probes: Added listener for k8s health probes
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed May 1, 2018
1 parent 9e95499 commit ac7b76a
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 0 deletions.
92 changes: 92 additions & 0 deletions probes/probes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2018 Jump Trading
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package probes defines a simpler HTTP listener for Kubernetes style
// liveness and readiness probes.
package probes

import (
"fmt"
"net/http"
"sync"
"sync/atomic"
)

// Listen starts a simple HTTP listener for responding to Kubernetes
// liveness and readiness probes on the port specified. The returned
// Probes instance has methods for setting the liveness and readiness
// states.
//
// Liveness probes are served at /healthz.
// Readiness probes are served at /readyz.
func Listen(port int) *Probes {
p := &Probes{
alive: new(atomic.Value),
ready: new(atomic.Value),
server: &http.Server{
Addr: fmt.Sprintf(":%d", port),
},
}
p.alive.Store(true)
p.ready.Store(false)

mux := http.NewServeMux()
mux.HandleFunc("/healthz", newHandler(p.alive))
mux.HandleFunc("/readyz", newHandler(p.ready))
p.server.Handler = mux

p.wg.Add(1)
go func() {
defer p.wg.Done()
p.server.ListenAndServe()
}()

return p
}

// Probes contains a simple HTTP listener for serving Kubernetes
// liveness and readiness probes.
type Probes struct {
alive *atomic.Value
ready *atomic.Value
server *http.Server
wg sync.WaitGroup
}

// SetAlive set the liveness state - true means alive/healthy.
func (p *Probes) SetAlive(alive bool) {
p.alive.Store(alive)
}

// SetReady set the readiness state - true means ready.
func (p *Probes) SetReady(ready bool) {
p.ready.Store(ready)
}

// Close shuts down the probes listener. It blocks until the listener
// has stopped.
func (p *Probes) Close() {
p.server.Close()
p.wg.Wait()
}

func newHandler(value *atomic.Value) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
if value.Load().(bool) {
w.WriteHeader(http.StatusOK)
return
}
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}
}
74 changes: 74 additions & 0 deletions probes/probes_medium_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2018 Jump Trading
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build medium

package probes_test

import (
"fmt"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jumptrading/influx-spout/probes"
)

const probesPort = 44450

func TestProbes(t *testing.T) {
p := probes.Listen(probesPort)
defer p.Close()

// Starting state is alive but not ready.
assertAlive(t)
assertNotReady(t)

// Toggle alive.
p.SetAlive(false)
assertNotAlive(t)
p.SetAlive(true)
assertAlive(t)

// Toggle ready.
p.SetReady(true)
assertReady(t)
p.SetReady(false)
assertNotReady(t)
}

func assertAlive(t *testing.T) {
assertProbe(t, "healthz", http.StatusOK)
}

func assertNotAlive(t *testing.T) {
assertProbe(t, "healthz", http.StatusServiceUnavailable)
}

func assertReady(t *testing.T) {
assertProbe(t, "readyz", http.StatusOK)
}

func assertNotReady(t *testing.T) {
assertProbe(t, "readyz", http.StatusServiceUnavailable)
}

func assertProbe(t *testing.T, path string, expectedStatus int) {
url := fmt.Sprintf("http://localhost:%d/%s", probesPort, path)
resp, err := http.Get(url)
require.NoError(t, err)
assert.Equal(t, expectedStatus, resp.StatusCode)
}

0 comments on commit ac7b76a

Please sign in to comment.