Skip to content

Commit

Permalink
feat(inputs.influxdb_listener): Add token based authentication (#13610)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jul 24, 2023
1 parent 54b1009 commit fe84675
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 27 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ require (
github.com/go-stomp/stomp v2.1.4+incompatible
github.com/gobwas/glob v0.2.3
github.com/gofrs/uuid/v5 v5.0.0
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.4
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc
Expand Down Expand Up @@ -308,6 +308,7 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw
github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A=
Expand Down
108 changes: 96 additions & 12 deletions internal/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,88 @@ package internal
import (
"crypto/subtle"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"strings"

"github.com/golang-jwt/jwt/v5"
)

type BasicAuthErrorFunc func(rw http.ResponseWriter)

// AuthHandler returns a http handler that requires HTTP basic auth
// JWTAuthHandler returns a http handler that requires the HTTP bearer auth
// token to be valid and match the given user.
func JWTAuthHandler(secret, username string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &jwtAuthHandler{
secret: []byte(secret),
username: []byte(username),
onError: onError,
next: h,
}
}
}

type jwtAuthHandler struct {
secret []byte
username []byte
onError BasicAuthErrorFunc
next http.Handler
}

func (h *jwtAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authentication")
if !strings.HasPrefix(authHeader, "Bearer ") {
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
bearer := strings.TrimPrefix(authHeader, "Bearer ")
token, err := jwt.Parse(bearer, func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", t.Method)
}
return h.secret, nil
})
if err != nil || !token.Valid {
h.onError(rw)
if err != nil && errors.Is(err, jwt.ErrTokenExpired) {
http.Error(rw, "token expired", http.StatusUnauthorized)
} else if err != nil {
http.Error(rw, "invalid token: "+err.Error(), http.StatusUnauthorized)
} else {
http.Error(rw, "invalid token", http.StatusUnauthorized)
}
return
}

claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
h.onError(rw)
http.Error(rw, "problem authenticating token", http.StatusInternalServerError)
return
}

username, ok := claims["username"].(string)
if !ok || username == "" {
h.onError(rw)
http.Error(rw, "token must contain a string username", http.StatusUnauthorized)
return
}
if subtle.ConstantTimeCompare([]byte(username), h.username) != 1 {
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}

h.next.ServeHTTP(rw, req)
}

// BasicAuthHandler returns a http handler that requires HTTP basic auth
// credentials to match the given username and password.
func AuthHandler(username, password, realm string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler {
func BasicAuthHandler(username, password, realm string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &basicAuthHandler{
username: username,
Expand All @@ -33,16 +105,28 @@ type basicAuthHandler struct {
}

func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if h.username != "" || h.password != "" {
reqUsername, reqPassword, ok := req.BasicAuth()
if !ok ||
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 ||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 {
rw.Header().Set("WWW-Authenticate", "Basic realm=\""+h.realm+"\"")
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
if h.username == "" && h.password == "" {
h.next.ServeHTTP(rw, req)
return
}

var reqUsername, reqPassword string
var ok bool
authHeader := req.Header.Get("Authorization")
if strings.HasPrefix(authHeader, "Token ") {
token := strings.TrimPrefix(authHeader, "Token ")
reqUsername, reqPassword, ok = strings.Cut(token, ":")
} else {
reqUsername, reqPassword, ok = req.BasicAuth()
}

if !ok ||
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 ||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 {
rw.Header().Set("WWW-Authenticate", "Basic realm=\""+h.realm+"\"")
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}

h.next.ServeHTTP(rw, req)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/dcos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"net/url"
"time"

"github.com/golang-jwt/jwt/v4"
"github.com/golang-jwt/jwt/v5"
)

const (
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/dcos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"net/url"
"testing"

jwt "github.com/golang-jwt/jwt/v4"
"github.com/influxdata/telegraf/testutil"
jwt "github.com/golang-jwt/jwt/v5"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/testutil"
)

var privateKey = testutil.NewPKI("../../../testutil/pki").ReadServerKey()
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/dcos/dcos.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sync"
"time"

"github.com/golang-jwt/jwt/v4"
"github.com/golang-jwt/jwt/v5"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/dcos/dcos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"fmt"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/testutil"
)

type mockClient struct {
Expand Down
14 changes: 13 additions & 1 deletion plugins/inputs/influxdb_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,23 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## the value of this tag name.
# retention_policy_tag = ""

## Optional username and password to accept for HTTP basic authentication.
## Optional username and password to accept for HTTP basic authentication
## or authentication token.
## You probably want to make sure you have TLS configured above for this.
## Use these options for the authentication token in the form
## Authentication: Token <basic_username>:<basic_password>
# basic_username = "foobar"
# basic_password = "barfoo"

## Optional JWT token authentication for HTTP requests
## Please see the documentation at
## https://docs.influxdata.com/influxdb/v1.8/administration/authentication_and_authorization/#authenticate-using-jwt-tokens
## for further details.
## Please note: Token authentication and basic authentication cannot be used
## at the same time.
# token_shared_secret = ""
# token_username = ""

## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
Expand Down
29 changes: 24 additions & 5 deletions plugins/inputs/influxdb_listener/influxdb_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type InfluxDBListener struct {
MaxLineSize config.Size `toml:"max_line_size" deprecated:"1.14.0;parser now handles lines of unlimited length and option is ignored"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
TokenSharedSecret string `toml:"token_shared_secret"`
TokenUsername string `toml:"token_username"`
DatabaseTag string `toml:"database_tag"`
RetentionPolicyTag string `toml:"retention_policy_tag"`
ParserType string `toml:"parser_type"`
Expand Down Expand Up @@ -78,11 +80,20 @@ func (h *InfluxDBListener) Gather(_ telegraf.Accumulator) error {
}

func (h *InfluxDBListener) routes() {
authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, "influxdb",
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
var authHandler func(http.Handler) http.Handler
if h.TokenSharedSecret != "" {
authHandler = internal.JWTAuthHandler(h.TokenSharedSecret, h.TokenUsername,
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
} else {
authHandler = internal.BasicAuthHandler(h.BasicUsername, h.BasicPassword, "influxdb",
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
}

h.mux.Handle("/write", authHandler(h.handleWrite()))
h.mux.Handle("/query", authHandler(h.handleQuery()))
Expand All @@ -91,6 +102,14 @@ func (h *InfluxDBListener) routes() {
}

func (h *InfluxDBListener) Init() error {
// Check the config setting
if (h.BasicUsername != "" || h.BasicPassword != "") && (h.TokenSharedSecret != "" || h.TokenUsername != "") {
return errors.New("cannot use basic-auth and tokens at the same time")
}
if h.TokenSharedSecret != "" && h.TokenUsername == "" || h.TokenSharedSecret == "" && h.TokenUsername != "" {
return errors.New("neither 'token_shared_secret' nor 'token_username' can be empty for token authentication")
}

tags := map[string]string{
"address": h.ServiceAddress,
}
Expand Down
99 changes: 99 additions & 0 deletions plugins/inputs/influxdb_listener/influxdb_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"net/url"
"os"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/golang-jwt/jwt/v5"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -163,6 +166,102 @@ func TestWriteBasicAuth(t *testing.T) {
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
}

func TestWriteToken(t *testing.T) {
plugin := &InfluxDBListener{
ServiceAddress: "localhost:0",
TokenSharedSecret: "a S3cr3T $sTr1ng",
TokenUsername: "John Doe",
Log: testutil.Logger{},
timeFunc: time.Now,
}
require.NoError(t, plugin.Init())

// Create a valid token
token, err := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{
"username": plugin.TokenUsername,
"exp": time.Now().Add(5 * time.Minute).Unix(),
}).SignedString([]byte(plugin.TokenSharedSecret))
require.NoError(t, err)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

client := &http.Client{}
req, err := http.NewRequest("POST", createURL(plugin, "http", "/write", "db=mydb"), bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
req.Header.Add("Authentication", "Bearer "+token)
resp, err := client.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
}

func TestWriteTokenInvalidUser(t *testing.T) {
plugin := &InfluxDBListener{
ServiceAddress: "localhost:0",
TokenSharedSecret: "a S3cr3T $sTr1ng",
TokenUsername: "John Doe",
Log: testutil.Logger{},
timeFunc: time.Now,
}
require.NoError(t, plugin.Init())

// Create a valid token
token, err := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{
"username": "peter",
"exp": time.Now().Add(5 * time.Minute).Unix(),
}).SignedString([]byte(plugin.TokenSharedSecret))
require.NoError(t, err)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

client := &http.Client{}
req, err := http.NewRequest("POST", createURL(plugin, "http", "/write", "db=mydb"), bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
req.Header.Add("Authentication", "Bearer "+token)
resp, err := client.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, http.StatusUnauthorized, resp.StatusCode)
}

func TestWriteTokenExpired(t *testing.T) {
plugin := &InfluxDBListener{
ServiceAddress: "localhost:0",
TokenSharedSecret: "a S3cr3T $sTr1ng",
TokenUsername: "John Doe",
Log: testutil.Logger{},
timeFunc: time.Now,
}
require.NoError(t, plugin.Init())

// Create a valid token
token, err := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{
"username": plugin.TokenUsername,
"exp": time.Now().Add(-5 * time.Minute).Unix(),
}).SignedString([]byte(plugin.TokenSharedSecret))
require.NoError(t, err)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

client := &http.Client{}
req, err := http.NewRequest("POST", createURL(plugin, "http", "/write", "db=mydb"), bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
req.Header.Add("Authentication", "Bearer "+token)
resp, err := client.Do(req)
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, http.StatusUnauthorized, resp.StatusCode)
require.EqualValues(t, "token expired", strings.TrimSpace(string(body)))
}

func TestWriteKeepDatabase(t *testing.T) {
testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n"

Expand Down
Loading

0 comments on commit fe84675

Please sign in to comment.