Skip to content

Commit

Permalink
metric unregistration on route removal, fixed multi-ips as visitor la…
Browse files Browse the repository at this point in the history
…bel detected from x headers
  • Loading branch information
yusing committed Nov 9, 2024
1 parent a1d1325 commit 6194bac
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 48 deletions.
4 changes: 2 additions & 2 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package api

import (
"fmt"
"net"
"net/http"

Expand All @@ -20,7 +19,7 @@ func NewServeMux() ServeMux {
}

func (mux ServeMux) HandleFunc(method, endpoint string, handler http.HandlerFunc) {
mux.ServeMux.HandleFunc(fmt.Sprintf("%s %s", method, endpoint), checkHost(rateLimited(handler)))
mux.ServeMux.HandleFunc(method+" "+endpoint, checkHost(rateLimited(handler)))
}

func NewHandler() http.Handler {
Expand Down Expand Up @@ -55,6 +54,7 @@ func checkHost(f http.HandlerFunc) http.HandlerFunc {
http.Error(w, "forbidden", http.StatusForbidden)
return
}
LogInfo(r).Interface("headers", r.Header).Msg("API request")
f(w, r)
}
}
Expand Down
26 changes: 24 additions & 2 deletions internal/docker/idlewatcher/waker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/yusing/go-proxy/internal/common"
. "github.com/yusing/go-proxy/internal/docker/idlewatcher/types"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/metrics"
gphttp "github.com/yusing/go-proxy/internal/net/http"
net "github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/proxy/entry"
Expand All @@ -20,6 +23,7 @@ type waker struct {
rp *gphttp.ReverseProxy
stream net.Stream
hc health.HealthChecker
metric *metrics.Gauge

ready atomic.Bool
}
Expand Down Expand Up @@ -53,6 +57,13 @@ func newWaker(providerSubTask task.Task, entry entry.Entry, rp *gphttp.ReversePr
default:
panic("both nil")
}

if common.PrometheusEnabled {
m := metrics.GetServiceMetrics()
fqn := providerSubTask.Parent().Name() + "/" + entry.TargetName()
waker.metric = m.HealthStatus.With(metrics.HealthMetricLabels(fqn))
waker.metric.Set(float64(watcher.Status()))
}
return watcher, nil
}

Expand All @@ -68,8 +79,11 @@ func NewStreamWaker(providerSubTask task.Task, entry entry.Entry, stream net.Str
// Start implements health.HealthMonitor.
func (w *Watcher) Start(routeSubTask task.Task) E.Error {
routeSubTask.Finish("ignored")
w.task.OnCancel("stop route", func() {
w.task.OnCancel("stop route and cleanup", func() {
routeSubTask.Parent().Finish(w.task.FinishCause())
if w.metric != nil {
prometheus.Unregister(w.metric)
}
})
return nil
}
Expand All @@ -96,8 +110,16 @@ func (w *Watcher) Uptime() time.Duration {
return 0
}

// Status implements health.HealthMonitor.
func (w *Watcher) Status() health.Status {
status := w.getStatusUpdateReady()
if w.metric != nil {
w.metric.Set(float64(status))
}
return status
}

// Status implements health.HealthMonitor.
func (w *Watcher) getStatusUpdateReady() health.Status {
if !w.ContainerRunning {
return health.StatusNapping
}
Expand Down
12 changes: 6 additions & 6 deletions internal/metrics/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import "github.com/prometheus/client_golang/prometheus"

type (
Counter struct {
collector prometheus.Counter
mv *prometheus.CounterVec
collector prometheus.Counter
}
Gauge struct {
collector prometheus.Gauge
mv *prometheus.GaugeVec
collector prometheus.Gauge
}
Labels interface {
toPromLabels() prometheus.Labels
Expand Down Expand Up @@ -52,8 +52,8 @@ func (c *Counter) Inc() {
c.collector.Inc()
}

func (c *Counter) With(l Labels) prometheus.Counter {
return c.mv.With(l.toPromLabels())
func (c *Counter) With(l Labels) *Counter {
return &Counter{mv: c.mv, collector: c.mv.With(l.toPromLabels())}
}

func (g *Gauge) Collect(ch chan<- prometheus.Metric) {
Expand All @@ -68,6 +68,6 @@ func (g *Gauge) Set(v float64) {
g.collector.Set(v)
}

func (g *Gauge) With(l Labels) prometheus.Gauge {
return g.mv.With(l.toPromLabels())
func (g *Gauge) With(l Labels) *Gauge {
return &Gauge{mv: g.mv, collector: g.mv.With(l.toPromLabels())}
}
48 changes: 39 additions & 9 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@ import (
"github.com/yusing/go-proxy/internal/common"
)

type RouteMetrics struct {
HTTPReqTotal,
HTTP2xx3xx,
HTTP4xx,
HTTP5xx *Counter
HTTPReqElapsed *Gauge
HealthStatus *Gauge
}
type (
RouteMetrics struct {
HTTPReqTotal,
HTTP2xx3xx,
HTTP4xx,
HTTP5xx *Counter
HTTPReqElapsed *Gauge
}

ServiceMetrics struct {
HealthStatus *Gauge
}
)

var rm RouteMetrics
var (
rm RouteMetrics
sm ServiceMetrics
)

const (
routerNamespace = "router"
Expand All @@ -29,10 +37,27 @@ func GetRouteMetrics() *RouteMetrics {
return &rm
}

func GetServiceMetrics() *ServiceMetrics {
return &sm
}

func (rm *RouteMetrics) UnregisterService(service string) {
lbls := &HTTPRouteMetricLabels{Service: service}
prometheus.Unregister(rm.HTTP2xx3xx.With(lbls))
prometheus.Unregister(rm.HTTP4xx.With(lbls))
prometheus.Unregister(rm.HTTP5xx.With(lbls))
prometheus.Unregister(rm.HTTPReqElapsed.With(lbls))
}

func init() {
if !common.PrometheusEnabled {
return
}
initRouteMetrics()
initServiceMetrics()
}

func initRouteMetrics() {
lbls := []string{"service", "method", "host", "visitor", "path"}
partitionsHelp := ", partitioned by " + strings.Join(lbls, ", ")
rm = RouteMetrics{
Expand Down Expand Up @@ -66,6 +91,11 @@ func init() {
Name: "req_elapsed_ms",
Help: "How long it took to process the request and respond a status code" + partitionsHelp,
}, lbls...),
}
}

func initServiceMetrics() {
sm = ServiceMetrics{
HealthStatus: NewGauge(prometheus.GaugeOpts{
Namespace: serviceNamespace,
Name: "health_status",
Expand Down
33 changes: 13 additions & 20 deletions internal/net/http/reverse_proxy_mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package http
// Copyright (c) 2024 yusing

import (
"bufio"
"bytes"
"context"
"errors"
Expand Down Expand Up @@ -123,20 +122,8 @@ func (l *httpMetricLogger) WriteHeader(status int) {
}()
}

// Hijack hijacks the connection.
func (l *httpMetricLogger) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if h, ok := l.ResponseWriter.(http.Hijacker); ok {
return h.Hijack()
}

return nil, nil, fmt.Errorf("not a hijacker: %T", l.ResponseWriter)
}

// Flush sends any buffered data to the client.
func (l *httpMetricLogger) Flush() {
if flusher, ok := l.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
func (l *httpMetricLogger) Unwrap() http.ResponseWriter {
return l.ResponseWriter
}

func singleJoiningSlash(a, b string) string {
Expand Down Expand Up @@ -208,6 +195,10 @@ func NewReverseProxy(name string, target types.URL, transport http.RoundTripper)
return rp
}

func (p *ReverseProxy) UnregisterMetrics() {
metrics.GetRouteMetrics().UnregisterService(p.TargetName)
}

func rewriteRequestURL(req *http.Request, target *url.URL) {
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
Expand Down Expand Up @@ -280,11 +271,13 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) {
if common.PrometheusEnabled {
t := time.Now()
var visitor string
if realIP := req.Header.Get("X-Real-IP"); realIP != "" {
visitor = realIP
if realIPs := req.Header.Values("X-Real-IP"); len(realIPs) > 0 {
visitor = realIPs[len(realIPs)-1]
}
if fwdIP := req.Header.Get("X-Forwarded-For"); visitor == "" && fwdIP != "" {
visitor = fwdIP
if visitor == "" {
if fwdIPs := req.Header.Values("X-Forwarded-For"); len(fwdIPs) > 0 {
visitor = fwdIPs[len(fwdIPs)-1]
}
}
if visitor == "" {
var err error
Expand Down Expand Up @@ -444,7 +437,7 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) {
Proto: outreq.Proto,
ProtoMajor: outreq.ProtoMajor,
ProtoMinor: outreq.ProtoMinor,
Header: make(http.Header),
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader([]byte("Origin server is not reachable."))),
Request: outreq,
TLS: outreq.TLS,
Expand Down
4 changes: 4 additions & 0 deletions internal/route/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/rs/zerolog"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/docker/idlewatcher"
E "github.com/yusing/go-proxy/internal/error"
gphttp "github.com/yusing/go-proxy/internal/net/http"
Expand Down Expand Up @@ -156,6 +157,9 @@ func (r *HTTPRoute) Start(providerSubtask task.Task) E.Error {
})
}

if common.PrometheusEnabled {
r.task.OnFinished("unreg metrics", r.rp.UnregisterMetrics)
}
return nil
}

Expand Down
21 changes: 12 additions & 9 deletions internal/watcher/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/yusing/go-proxy/internal/common"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging"
Expand All @@ -28,6 +29,8 @@ type (
checkHealth HealthCheckFunc
startTime time.Time

metric *metrics.Gauge

task task.Task
}
)
Expand Down Expand Up @@ -61,6 +64,10 @@ func (mon *monitor) Start(routeSubtask task.Task) E.Error {
return E.From(ErrNegativeInterval)
}

if common.PrometheusEnabled {
mon.metric = metrics.GetServiceMetrics().HealthStatus.With(metrics.HealthMetricLabels(mon.service))
}

go func() {
logger := logging.With().Str("name", mon.service).Logger()

Expand All @@ -69,6 +76,9 @@ func (mon *monitor) Start(routeSubtask task.Task) E.Error {
mon.status.Store(StatusUnknown)
}
mon.task.Finish(nil)
if mon.metric != nil {
prometheus.Unregister(mon.metric)
}
}()

if err := mon.checkUpdateHealth(); err != nil {
Expand Down Expand Up @@ -175,15 +185,8 @@ func (mon *monitor) checkUpdateHealth() error {
notif.Notify(mon.service, "server is down")
}
}
if common.PrometheusEnabled {
go func() {
m := metrics.GetRouteMetrics()
var up float64
if healthy {
up = 1
}
m.HealthStatus.With(metrics.HealthMetricLabels(mon.service)).Set(up)
}()
if mon.metric != nil {
mon.metric.Set(float64(status))
}

return nil
Expand Down

0 comments on commit 6194bac

Please sign in to comment.