From f08b10c5bc95caa153600af4dc5b6624a475411d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Fri, 7 Jun 2024 16:56:19 +0200 Subject: [PATCH] feat: Support zstd compression (#1496) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Support zstd encoding This allows endpoints to respond with zstd compressed metric data, if the requester supports it. I have imported a content-encoding parser from https://github.com/golang/gddo which is an archived repository to support different content-encoding headers. Signed-off-by: Manuel Rüger * Update prometheus/promhttp/http.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger * Update prometheus/promhttp/http.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger * Update prometheus/promhttp/http.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger * Integrate review comments * String typed enum Signed-off-by: Manuel Rüger * Test with gzip compression Signed-off-by: Manuel Rüger * Update prometheus/promhttp/http.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger * Reorder error handling Signed-off-by: Manuel Rüger * Apply suggestions from code review Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger * Include review suggestions Signed-off-by: Manuel Rüger --------- Signed-off-by: Manuel Rüger Co-authored-by: Bartlomiej Plotka --- go.mod | 2 + go.sum | 3 + internal/github.com/golang/gddo/LICENSE | 27 ++ internal/github.com/golang/gddo/README.md | 1 + .../golang/gddo/httputil/header/header.go | 145 ++++++++ .../gddo/httputil/header/header_test.go | 49 +++ .../golang/gddo/httputil/negotiate.go | 36 ++ .../golang/gddo/httputil/negotiate_test.go | 40 +++ prometheus/promhttp/http.go | 111 +++++-- prometheus/promhttp/http_test.go | 312 +++++++++++++++++- 10 files changed, 697 insertions(+), 29 deletions(-) create mode 100644 internal/github.com/golang/gddo/LICENSE create mode 100644 internal/github.com/golang/gddo/README.md create mode 100644 internal/github.com/golang/gddo/httputil/header/header.go create mode 100644 internal/github.com/golang/gddo/httputil/header/header_test.go create mode 100644 internal/github.com/golang/gddo/httputil/negotiate.go create mode 100644 internal/github.com/golang/gddo/httputil/negotiate_test.go diff --git a/go.mod b/go.mod index 0e3889f03..bbb9531ce 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.20 require ( github.com/beorn7/perks v1.0.1 github.com/cespare/xxhash/v2 v2.3.0 + github.com/google/go-cmp v0.6.0 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.17.8 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.53.0 github.com/prometheus/procfs v0.15.1 diff --git a/go.sum b/go.sum index de5929d66..61a208be3 100644 --- a/go.sum +++ b/go.sum @@ -12,11 +12,14 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/internal/github.com/golang/gddo/LICENSE b/internal/github.com/golang/gddo/LICENSE new file mode 100644 index 000000000..65d761bc9 --- /dev/null +++ b/internal/github.com/golang/gddo/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2013 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/internal/github.com/golang/gddo/README.md b/internal/github.com/golang/gddo/README.md new file mode 100644 index 000000000..69af39a33 --- /dev/null +++ b/internal/github.com/golang/gddo/README.md @@ -0,0 +1 @@ +This source code is a stripped down version from the archived repository https://github.com/golang/gddo and licensed under BSD. diff --git a/internal/github.com/golang/gddo/httputil/header/header.go b/internal/github.com/golang/gddo/httputil/header/header.go new file mode 100644 index 000000000..8547c8dfd --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/header/header.go @@ -0,0 +1,145 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +// Package header provides functions for parsing HTTP headers. +package header + +import ( + "net/http" + "strings" +) + +// Octet types from RFC 2616. +var octetTypes [256]octetType + +type octetType byte + +const ( + isToken octetType = 1 << iota + isSpace +) + +func init() { + // OCTET = + // CHAR = + // CTL = + // CR = + // LF = + // SP = + // HT = + // <"> = + // CRLF = CR LF + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = + // separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <"> + // | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT + // token = 1* + // qdtext = > + + for c := 0; c < 256; c++ { + var t octetType + isCtl := c <= 31 || c == 127 + isChar := 0 <= c && c <= 127 + isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c)) + if strings.ContainsRune(" \t\r\n", rune(c)) { + t |= isSpace + } + if isChar && !isCtl && !isSeparator { + t |= isToken + } + octetTypes[c] = t + } +} + +// AcceptSpec describes an Accept* header. +type AcceptSpec struct { + Value string + Q float64 +} + +// ParseAccept parses Accept* headers. +func ParseAccept(header http.Header, key string) (specs []AcceptSpec) { +loop: + for _, s := range header[key] { + for { + var spec AcceptSpec + spec.Value, s = expectTokenSlash(s) + if spec.Value == "" { + continue loop + } + spec.Q = 1.0 + s = skipSpace(s) + if strings.HasPrefix(s, ";") { + s = skipSpace(s[1:]) + if !strings.HasPrefix(s, "q=") { + continue loop + } + spec.Q, s = expectQuality(s[2:]) + if spec.Q < 0.0 { + continue loop + } + } + specs = append(specs, spec) + s = skipSpace(s) + if !strings.HasPrefix(s, ",") { + continue loop + } + s = skipSpace(s[1:]) + } + } + return +} + +func skipSpace(s string) (rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isSpace == 0 { + break + } + } + return s[i:] +} + +func expectTokenSlash(s string) (token, rest string) { + i := 0 + for ; i < len(s); i++ { + b := s[i] + if (octetTypes[b]&isToken == 0) && b != '/' { + break + } + } + return s[:i], s[i:] +} + +func expectQuality(s string) (q float64, rest string) { + switch { + case len(s) == 0: + return -1, "" + case s[0] == '0': + q = 0 + case s[0] == '1': + q = 1 + default: + return -1, "" + } + s = s[1:] + if !strings.HasPrefix(s, ".") { + return q, s + } + s = s[1:] + i := 0 + n := 0 + d := 1 + for ; i < len(s); i++ { + b := s[i] + if b < '0' || b > '9' { + break + } + n = n*10 + int(b) - '0' + d *= 10 + } + return q + float64(n)/float64(d), s[i:] +} diff --git a/internal/github.com/golang/gddo/httputil/header/header_test.go b/internal/github.com/golang/gddo/httputil/header/header_test.go new file mode 100644 index 000000000..e26eb6c30 --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/header/header_test.go @@ -0,0 +1,49 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +package header + +import ( + "net/http" + "testing" + + "github.com/google/go-cmp/cmp" +) + +var parseAcceptTests = []struct { + s string + expected []AcceptSpec +}{ + {"text/html", []AcceptSpec{{"text/html", 1}}}, + {"text/html; q=0", []AcceptSpec{{"text/html", 0}}}, + {"text/html; q=0.0", []AcceptSpec{{"text/html", 0}}}, + {"text/html; q=1", []AcceptSpec{{"text/html", 1}}}, + {"text/html; q=1.0", []AcceptSpec{{"text/html", 1}}}, + {"text/html; q=0.1", []AcceptSpec{{"text/html", 0.1}}}, + {"text/html;q=0.1", []AcceptSpec{{"text/html", 0.1}}}, + {"text/html, text/plain", []AcceptSpec{{"text/html", 1}, {"text/plain", 1}}}, + {"text/html; q=0.1, text/plain", []AcceptSpec{{"text/html", 0.1}, {"text/plain", 1}}}, + {"iso-8859-5, unicode-1-1;q=0.8,iso-8859-1", []AcceptSpec{{"iso-8859-5", 1}, {"unicode-1-1", 0.8}, {"iso-8859-1", 1}}}, + {"iso-8859-1", []AcceptSpec{{"iso-8859-1", 1}}}, + {"*", []AcceptSpec{{"*", 1}}}, + {"da, en-gb;q=0.8, en;q=0.7", []AcceptSpec{{"da", 1}, {"en-gb", 0.8}, {"en", 0.7}}}, + {"da, q, en-gb;q=0.8", []AcceptSpec{{"da", 1}, {"q", 1}, {"en-gb", 0.8}}}, + {"image/png, image/*;q=0.5", []AcceptSpec{{"image/png", 1}, {"image/*", 0.5}}}, + + // bad cases + {"value1; q=0.1.2", []AcceptSpec{{"value1", 0.1}}}, + {"da, en-gb;q=foo", []AcceptSpec{{"da", 1}}}, +} + +func TestParseAccept(t *testing.T) { + for _, tt := range parseAcceptTests { + header := http.Header{"Accept": {tt.s}} + actual := ParseAccept(header, "Accept") + if !cmp.Equal(actual, tt.expected) { + t.Errorf("ParseAccept(h, %q)=%v, want %v", tt.s, actual, tt.expected) + } + } +} diff --git a/internal/github.com/golang/gddo/httputil/negotiate.go b/internal/github.com/golang/gddo/httputil/negotiate.go new file mode 100644 index 000000000..2e45780b7 --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/negotiate.go @@ -0,0 +1,36 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +package httputil + +import ( + "net/http" + + "github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil/header" +) + +// NegotiateContentEncoding returns the best offered content encoding for the +// request's Accept-Encoding header. If two offers match with equal weight and +// then the offer earlier in the list is preferred. If no offers are +// acceptable, then "" is returned. +func NegotiateContentEncoding(r *http.Request, offers []string) string { + bestOffer := "identity" + bestQ := -1.0 + specs := header.ParseAccept(r.Header, "Accept-Encoding") + for _, offer := range offers { + for _, spec := range specs { + if spec.Q > bestQ && + (spec.Value == "*" || spec.Value == offer) { + bestQ = spec.Q + bestOffer = offer + } + } + } + if bestQ == 0 { + bestOffer = "" + } + return bestOffer +} diff --git a/internal/github.com/golang/gddo/httputil/negotiate_test.go b/internal/github.com/golang/gddo/httputil/negotiate_test.go new file mode 100644 index 000000000..cdd5807ca --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/negotiate_test.go @@ -0,0 +1,40 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +package httputil + +import ( + "net/http" + "testing" +) + +var negotiateContentEncodingTests = []struct { + s string + offers []string + expect string +}{ + {"", []string{"identity", "gzip"}, "identity"}, + {"*;q=0", []string{"identity", "gzip"}, ""}, + {"gzip", []string{"identity", "gzip"}, "gzip"}, + {"gzip,zstd", []string{"identity", "zstd"}, "zstd"}, + {"zstd,gzip", []string{"gzip", "zstd"}, "gzip"}, + {"gzip,zstd", []string{"gzip", "zstd"}, "gzip"}, + {"gzip,zstd", []string{"zstd", "gzip"}, "zstd"}, + {"gzip;q=0.1,zstd;q=0.5", []string{"gzip", "zstd"}, "zstd"}, + {"gzip;q=1.0, identity; q=0.5, *;q=0", []string{"identity", "gzip"}, "gzip"}, + {"gzip;q=1.0, identity; q=0.5, *;q=0", []string{"identity", "zstd"}, "identity"}, + {"zstd", []string{"identity", "gzip"}, "identity"}, +} + +func TestNegotiateContentEncoding(t *testing.T) { + for _, tt := range negotiateContentEncodingTests { + r := &http.Request{Header: http.Header{"Accept-Encoding": {tt.s}}} + actual := NegotiateContentEncoding(r, tt.offers) + if actual != tt.expect { + t.Errorf("NegotiateContentEncoding(%q, %#v)=%q, want %q", tt.s, tt.offers, actual, tt.expect) + } + } +} diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 09b8d2fbe..2e0b9a864 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -38,12 +38,13 @@ import ( "io" "net/http" "strconv" - "strings" "sync" "time" + "github.com/klauspost/compress/zstd" "github.com/prometheus/common/expfmt" + "github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil" "github.com/prometheus/client_golang/prometheus" ) @@ -54,6 +55,18 @@ const ( processStartTimeHeader = "Process-Start-Time-Unix" ) +// Compression represents the content encodings handlers support for the HTTP +// responses. +type Compression string + +const ( + Identity Compression = "identity" + Gzip Compression = "gzip" + Zstd Compression = "zstd" +) + +var defaultCompressionFormats = []Compression{Identity, Gzip, Zstd} + var gzipPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) @@ -122,6 +135,18 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } } + // Select compression formats to offer based on default or user choice. + var compressions []string + if !opts.DisableCompression { + offers := defaultCompressionFormats + if len(opts.OfferedCompressions) > 0 { + offers = opts.OfferedCompressions + } + for _, comp := range offers { + compressions = append(compressions, string(comp)) + } + } + h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if !opts.ProcessStartTime.IsZero() { rsp.Header().Set(processStartTimeHeader, strconv.FormatInt(opts.ProcessStartTime.Unix(), 10)) @@ -165,20 +190,20 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } else { contentType = expfmt.Negotiate(req.Header) } - header := rsp.Header() - header.Set(contentTypeHeader, string(contentType)) + rsp.Header().Set(contentTypeHeader, string(contentType)) - w := io.Writer(rsp) - if !opts.DisableCompression && gzipAccepted(req.Header) { - header.Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) + w, encodingHeader, closeWriter, err := negotiateEncodingWriter(req, rsp, compressions) + if err != nil { + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error getting writer", err) + } + w = io.Writer(rsp) + encodingHeader = string(Identity) + } - gz.Reset(w) - defer gz.Close() + defer closeWriter() - w = gz - } + rsp.Header().Set(contentEncodingHeader, encodingHeader) enc := expfmt.NewEncoder(w, contentType) @@ -343,9 +368,19 @@ type HandlerOpts struct { // no effect on the HTTP status code because ErrorHandling is set to // ContinueOnError. Registry prometheus.Registerer - // If DisableCompression is true, the handler will never compress the - // response, even if requested by the client. + // DisableCompression disables the response encoding (compression) and + // encoding negotiation. If true, the handler will + // never compress the response, even if requested + // by the client and the OfferedCompressions field is set. DisableCompression bool + // OfferedCompressions is a set of encodings (compressions) handler will + // try to offer when negotiating with the client. This defaults to identity, gzip + // and zstd. + // NOTE: If handler can't agree with the client on the encodings or + // unsupported or empty encodings are set in OfferedCompressions, + // handler always fallbacks to no compression (identity), for + // compatibility reasons. In such cases ErrorLog will be used if set. + OfferedCompressions []Compression // The number of concurrent HTTP requests is limited to // MaxRequestsInFlight. Additional requests are responded to with 503 // Service Unavailable and a suitable message in the body. If @@ -381,19 +416,6 @@ type HandlerOpts struct { ProcessStartTime time.Time } -// gzipAccepted returns whether the client will accept gzip-encoded content. -func gzipAccepted(header http.Header) bool { - a := header.Get(acceptEncodingHeader) - parts := strings.Split(a, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - return true - } - } - return false -} - // httpError removes any content-encoding header and then calls http.Error with // the provided error and http.StatusInternalServerError. Error contents is // supposed to be uncompressed plain text. Same as with a plain http.Error, this @@ -406,3 +428,38 @@ func httpError(rsp http.ResponseWriter, err error) { http.StatusInternalServerError, ) } + +// negotiateEncodingWriter reads the Accept-Encoding header from a request and +// selects the right compression based on an allow-list of supported +// compressions. It returns a writer implementing the compression and an the +// correct value that the caller can set in the response header. +func negotiateEncodingWriter(r *http.Request, rw io.Writer, compressions []string) (_ io.Writer, encodingHeaderValue string, closeWriter func(), _ error) { + if len(compressions) == 0 { + return rw, string(Identity), func() {}, nil + } + + // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. + selected := httputil.NegotiateContentEncoding(r, compressions) + + switch selected { + case "zstd": + // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. + z, err := zstd.NewWriter(rw, zstd.WithEncoderLevel(zstd.SpeedFastest)) + if err != nil { + return nil, "", func() {}, err + } + + z.Reset(rw) + return z, selected, func() { _ = z.Close() }, nil + case "gzip": + gz := gzipPool.Get().(*gzip.Writer) + gz.Reset(rw) + return gz, selected, func() { _ = gz.Close(); gzipPool.Put(gz) }, nil + case "identity": + // This means the content is not compressed. + return rw, selected, func() {}, nil + default: + // The content encoding was not implemented yet. + return nil, "", func() {}, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", selected, defaultCompressionFormats) + } +} diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 8ca192748..0ed8fe341 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -15,8 +15,10 @@ package promhttp import ( "bytes" + "compress/gzip" "errors" "fmt" + "io" "log" "net/http" "net/http/httptest" @@ -24,6 +26,7 @@ import ( "testing" "time" + "github.com/klauspost/compress/zstd" dto "github.com/prometheus/client_model/go" "github.com/prometheus/client_golang/prometheus" @@ -31,6 +34,11 @@ import ( type errorCollector struct{} +const ( + acceptHeader = "Accept" + acceptTextPlain = "text/plain" +) + func (e errorCollector) Describe(ch chan<- *prometheus.Desc) { ch <- prometheus.NewDesc("invalid_metric", "not helpful", nil, nil) } @@ -71,6 +79,28 @@ func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), return mfs, func() { g.doneInvoked++ }, err } +func readCompressedBody(r io.Reader, comp Compression) (string, error) { + switch comp { + case Gzip: + reader, err := gzip.NewReader(r) + if err != nil { + return "", err + } + defer reader.Close() + got, err := io.ReadAll(reader) + return string(got), err + case Zstd: + reader, err := zstd.NewReader(r) + if err != nil { + return "", err + } + defer reader.Close() + got, err := io.ReadAll(reader) + return string(got), err + } + return "", fmt.Errorf("Unsupported compression") +} + func TestHandlerErrorHandling(t *testing.T) { // Create a registry that collects a MetricFamily with two elements, // another with one, and reports an error. Further down, we'll use the @@ -223,7 +253,7 @@ func TestInstrumentMetricHandler(t *testing.T) { InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept", "test/plain") + request.Header.Add(acceptHeader, acceptTextPlain) handler.ServeHTTP(writer, request) if got := mReg.gatherInvoked; got != 1 { @@ -237,6 +267,10 @@ func TestInstrumentMetricHandler(t *testing.T) { t.Errorf("got HTTP status code %d, want %d", got, want) } + if got, want := writer.Header().Get(contentEncodingHeader), string(Identity); got != want { + t.Errorf("got HTTP content encoding header %s, want %s", got, want) + } + want := "promhttp_metric_handler_requests_in_flight 1\n" if got := writer.Body.String(); !strings.Contains(got, want) { t.Errorf("got body %q, does not contain %q", got, want) @@ -278,7 +312,7 @@ func TestHandlerMaxRequestsInFlight(t *testing.T) { w2 := httptest.NewRecorder() w3 := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept", "test/plain") + request.Header.Add(acceptHeader, acceptTextPlain) c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)} reg.MustRegister(c) @@ -331,3 +365,277 @@ func TestHandlerTimeout(t *testing.T) { close(c.Block) // To not leak a goroutine. } + +func TestInstrumentMetricHandlerWithCompression(t *testing.T) { + reg := prometheus.NewRegistry() + mReg := &mockTransactionGatherer{g: reg} + handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{DisableCompression: false})) + compression := Zstd + writer := httptest.NewRecorder() + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add(acceptHeader, acceptTextPlain) + request.Header.Add(acceptEncodingHeader, string(compression)) + + handler.ServeHTTP(writer, request) + if got := mReg.gatherInvoked; got != 1 { + t.Fatalf("unexpected number of gather invokes, want 1, got %d", got) + } + if got := mReg.doneInvoked; got != 1 { + t.Fatalf("unexpected number of done invokes, want 1, got %d", got) + } + + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + + if got, want := writer.Header().Get(contentEncodingHeader), string(compression); got != want { + t.Errorf("got HTTP content encoding header %s, want %s", got, want) + } + + body, err := readCompressedBody(writer.Body, compression) + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = "promhttp_metric_handler_requests_total{code=\"200\"} 0\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + for i := 0; i < 100; i++ { + writer.Body.Reset() + handler.ServeHTTP(writer, request) + + if got, want := mReg.gatherInvoked, i+2; got != want { + t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got) + } + if got, want := mReg.doneInvoked, i+2; got != want { + t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + body, err := readCompressedBody(writer.Body, compression) + + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1) + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + } + + // Test with Zstd + compression = Zstd + request.Header.Set(acceptEncodingHeader, string(compression)) + + handler.ServeHTTP(writer, request) + + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + + if got, want := writer.Header().Get(contentEncodingHeader), string(compression); got != want { + t.Errorf("got HTTP content encoding header %s, want %s", got, want) + } + + body, err = readCompressedBody(writer.Body, compression) + want = "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = "promhttp_metric_handler_requests_total{code=\"200\"} 101\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + for i := 101; i < 201; i++ { + writer.Body.Reset() + handler.ServeHTTP(writer, request) + + if got, want := mReg.gatherInvoked, i+2; got != want { + t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got) + } + if got, want := mReg.doneInvoked, i+2; got != want { + t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + body, err := readCompressedBody(writer.Body, compression) + + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1) + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + } +} + +func TestNegotiateEncodingWriter(t *testing.T) { + var defaultCompressions []string + + for _, comp := range defaultCompressionFormats { + defaultCompressions = append(defaultCompressions, string(comp)) + } + + testCases := []struct { + name string + offeredCompressions []string + acceptEncoding string + expectedCompression string + err error + }{ + { + name: "test without compression enabled", + offeredCompressions: []string{}, + acceptEncoding: "", + expectedCompression: "identity", + err: nil, + }, + { + name: "test with compression enabled with empty accept-encoding header", + offeredCompressions: defaultCompressions, + acceptEncoding: "", + expectedCompression: "identity", + err: nil, + }, + { + name: "test with gzip compression requested", + offeredCompressions: defaultCompressions, + acceptEncoding: "gzip", + expectedCompression: "gzip", + err: nil, + }, + { + name: "test with gzip, zstd compression requested", + offeredCompressions: defaultCompressions, + acceptEncoding: "gzip,zstd", + expectedCompression: "gzip", + err: nil, + }, + { + name: "test with zstd, gzip compression requested", + offeredCompressions: defaultCompressions, + acceptEncoding: "zstd,gzip", + expectedCompression: "gzip", + err: nil, + }, + } + + for _, test := range testCases { + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add(acceptEncodingHeader, test.acceptEncoding) + rr := httptest.NewRecorder() + _, encodingHeader, _, err := negotiateEncodingWriter(request, rr, test.offeredCompressions) + + if !errors.Is(err, test.err) { + t.Errorf("got error: %v, expected: %v", err, test.err) + } + + if encodingHeader != test.expectedCompression { + t.Errorf("got different compression type: %v, expected: %v", encodingHeader, test.expectedCompression) + } + } +} + +func BenchmarkCompression(b *testing.B) { + benchmarks := []struct { + name string + compressionType string + }{ + { + name: "test with gzip compression", + compressionType: "gzip", + }, + { + name: "test with zstd compression", + compressionType: "zstd", + }, + { + name: "test with no compression", + compressionType: "identity", + }, + } + sizes := []struct { + name string + metricCount int + labelCount int + labelLength int + metricLength int + }{ + { + name: "small", + metricCount: 50, + labelCount: 5, + labelLength: 5, + metricLength: 5, + }, + { + name: "medium", + metricCount: 500, + labelCount: 10, + labelLength: 5, + metricLength: 10, + }, + { + name: "large", + metricCount: 5000, + labelCount: 10, + labelLength: 5, + metricLength: 10, + }, + { + name: "extra-large", + metricCount: 50000, + labelCount: 20, + labelLength: 5, + metricLength: 10, + }, + } + + for _, size := range sizes { + reg := prometheus.NewRegistry() + handler := HandlerFor(reg, HandlerOpts{}) + + // Generate Metrics + // Original source: https://github.com/prometheus-community/avalanche/blob/main/metrics/serve.go + labelKeys := make([]string, size.labelCount) + for idx := 0; idx < size.labelCount; idx++ { + labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", size.labelLength), idx) + } + labelValues := make([]string, size.labelCount) + for idx := 0; idx < size.labelCount; idx++ { + labelValues[idx] = fmt.Sprintf("label_val_%s_%v", strings.Repeat("v", size.labelLength), idx) + } + metrics := make([]*prometheus.GaugeVec, size.metricCount) + for idx := 0; idx < size.metricCount; idx++ { + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: fmt.Sprintf("avalanche_metric_%s_%v_%v", strings.Repeat("m", size.metricLength), 0, idx), + Help: "A tasty metric morsel", + }, append([]string{"series_id", "cycle_id"}, labelKeys...)) + reg.MustRegister(gauge) + metrics[idx] = gauge + } + + for _, benchmark := range benchmarks { + b.Run(benchmark.name+"_"+size.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + writer := httptest.NewRecorder() + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add(acceptEncodingHeader, benchmark.compressionType) + handler.ServeHTTP(writer, request) + } + }) + } + } +}