Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SASL&mTLS authentication support for Kafka in Promtail #4663

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions clients/cmd/promtail/promtail-kafka-sasl-plain.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-sasl-plain
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: sasl
sasl_config:
mechanism: PLAIN
user: kafkaadmin
password: kafkaadmin-pass
use_tls: false
group_id: kafka_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-sasl-plain
26 changes: 26 additions & 0 deletions clients/cmd/promtail/promtail-kafka-sasl-scram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-sasl-plain
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: sasl
sasl_config:
mechanism: SCRAM-SHA-512
user: kafkaadmin
password: kafkaadmin-pass
use_tls: false
group_id: kafka_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-sasl-plain
28 changes: 28 additions & 0 deletions clients/cmd/promtail/promtail-kafka-sasl-ssl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-sasl-plain
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: sasl
sasl_config:
mechanism: PLAIN
user: kafkaadmin
password: kafkaadmin-pass
use_tls: true
ca_file: ../../../tools/kafka/secrets/promtail-kafka-ca.pem
insecure_skip_verify: true
group_id: kafka_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-sasl-plain
27 changes: 27 additions & 0 deletions clients/cmd/promtail/promtail-kafka-ssl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-mtls
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: ssl
tls_config:
ca_file: ../../../tools/kafka/secrets/promtail-kafka-ca.pem
cert_file: ../../../tools/kafka/secrets/kafka.consumer.keystore.cer.pem
key_file: ../../../tools/kafka/secrets/kafka.consumer.keystore.key.pem
server_name: localhost
insecure_skip_verify: true
group_id: kafka_mtls_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-mtls
49 changes: 49 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"reflect"
"time"

"github.com/Shopify/sarama"
"github.com/grafana/dskit/flagext"

promconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
Expand Down Expand Up @@ -244,6 +247,52 @@ type KafkaTargetConfig struct {

// Rebalancing strategy to use. (e.g sticky, roundrobin or range)
Assignor string `yaml:"assignor"`

// Authentication strategy with Kafka brokers
Authentication KafkaAuthentication `yaml:"authentication"`
}

// KafkaAuthenticationType specifies method to authenticate with Kafka brokers
type KafkaAuthenticationType string

const (
// KafkaAuthenticationTypeNone represents using no authentication
KafkaAuthenticationTypeNone = "none"
// KafkaAuthenticationTypeSSL represents using SSL/TLS to authenticate
KafkaAuthenticationTypeSSL = "ssl"
// KafkaAuthenticationTypeSASL represents using SASL to authenticate
KafkaAuthenticationTypeSASL = "sasl"
)

// KafkaAuthentication describe the configuration for authentication with Kafka brokers
type KafkaAuthentication struct {
// Type is authentication type
// Possible values: none, sasl and ssl (defaults to none).
Type KafkaAuthenticationType `yaml:"type"`

// TLSConfig is used for TLS encryption and authentication with Kafka brokers
TLSConfig promconfig.TLSConfig `yaml:"tls_config,omitempty"`

// SASLConfig is used for SASL authentication with Kafka brokers
SASLConfig KafkaSASLConfig `yaml:"sasl_config,omitempty"`
}

// KafkaSASLConfig describe the SASL configuration for authentication with Kafka brokers
type KafkaSASLConfig struct {
// SASL mechanism. Supports PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
Mechanism sarama.SASLMechanism `yaml:"mechanism"`

// SASL Username
User string `yaml:"user"`

// SASL Password for the User
Password flagext.Secret `yaml:"password"`

// UseTLS sets whether TLS is used with SASL
UseTLS bool `yaml:"use_tls"`

// TLSConfig is used for SASL over TLS. It is used only when UseTLS is true
TLSConfig promconfig.TLSConfig `yaml:",inline"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
Expand Down
69 changes: 69 additions & 0 deletions clients/pkg/promtail/targets/kafka/authentication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"crypto/x509"
"os"

promconfig "github.com/prometheus/common/config"
"github.com/xdg-go/scram"
)

func createTLSConfig(cfg promconfig.TLSConfig) (*tls.Config, error) {
tc := &tls.Config{
InsecureSkipVerify: cfg.InsecureSkipVerify,
ServerName: cfg.ServerName,
}
// load ca cert
if len(cfg.CAFile) > 0 {
caCert, err := os.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tc.RootCAs = caCertPool
}
// load client cert
if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}
tc.Certificates = []tls.Certificate{cert}
}
return tc, nil
}

// copied from https://github.com/Shopify/sarama/blob/44627b731c60bb90efe25573e7ef2b3f8df3fa23/examples/sasl_scram_client/scram_client.go
var (
SHA256 scram.HashGeneratorFcn = sha256.New
SHA512 scram.HashGeneratorFcn = sha512.New
)

// XDGSCRAMClient implements sarama.SCRAMClient
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
72 changes: 72 additions & 0 deletions clients/pkg/promtail/targets/kafka/target_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewSyncer(
default:
return nil, fmt.Errorf("unrecognized consumer group partition assignor: %s", cfg.KafkaConfig.Assignor)
}
config, err = withAuthentication(*config, cfg.KafkaConfig.Authentication)
if err != nil {
return nil, fmt.Errorf("error setting up kafka authentication: %w", err)
}
client, err := sarama.NewClient(cfg.KafkaConfig.Brokers, config)
if err != nil {
return nil, fmt.Errorf("error creating kafka client: %w", err)
Expand Down Expand Up @@ -113,6 +117,74 @@ func NewSyncer(
return t, nil
}

func withAuthentication(cfg sarama.Config, authCfg scrapeconfig.KafkaAuthentication) (*sarama.Config, error) {
if len(authCfg.Type) == 0 || authCfg.Type == scrapeconfig.KafkaAuthenticationTypeNone {
return &cfg, nil
}

switch authCfg.Type {
case scrapeconfig.KafkaAuthenticationTypeSSL:
return withSSLAuthentication(cfg, authCfg)
case scrapeconfig.KafkaAuthenticationTypeSASL:
return withSASLAuthentication(cfg, authCfg)
default:
return nil, fmt.Errorf("unsupported authentication type %s", authCfg.Type)
}
}

func withSSLAuthentication(cfg sarama.Config, authCfg scrapeconfig.KafkaAuthentication) (*sarama.Config, error) {
cfg.Net.TLS.Enable = true
tc, err := createTLSConfig(authCfg.TLSConfig)
if err != nil {
return nil, err
}
cfg.Net.TLS.Config = tc
return &cfg, nil
}

func withSASLAuthentication(cfg sarama.Config, authCfg scrapeconfig.KafkaAuthentication) (*sarama.Config, error) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = authCfg.SASLConfig.User
cfg.Net.SASL.Password = authCfg.SASLConfig.Password.Value
cfg.Net.SASL.Mechanism = authCfg.SASLConfig.Mechanism
if cfg.Net.SASL.Mechanism == "" {
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
}

supportedMechanism := []string{
sarama.SASLTypeSCRAMSHA512,
sarama.SASLTypeSCRAMSHA256,
sarama.SASLTypePlaintext,
}
if !util.StringSliceContains(supportedMechanism, string(authCfg.SASLConfig.Mechanism)) {
return nil, fmt.Errorf("error unsupported sasl mechanism: %s", authCfg.SASLConfig.Mechanism)
}

if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{
HashGeneratorFcn: SHA512,
}
}
}
if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{
HashGeneratorFcn: SHA256,
}
}
}
if authCfg.SASLConfig.UseTLS {
tc, err := createTLSConfig(authCfg.SASLConfig.TLSConfig)
if err != nil {
return nil, err
}
cfg.Net.TLS.Config = tc
cfg.Net.TLS.Enable = true
}
return &cfg, nil
}

func (ts *TargetSyncer) loop() {
topicChanged := make(chan []string)
ts.wg.Add(2)
Expand Down
Loading