Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SASL_PLAINTEXT authentication with Kafka broker #3056

Merged
merged 3 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,82 @@ instances:
# this has no effect.
# zk_prefix: /0.8

# SSL Configuration
### SASL Username/Password Configuration ###

# ssl_cafile: /path/to/pem/file
# security_protocol: PLAINTEXT
## @param security_protocol - string - required
## Protocol used to communicate with brokers.
## Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
#
security_protocol: PLAINTEXT

## @param sasl_mechanism - string - optional
## String picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL.
## Currently only PLAIN is supported.
#
# sasl_mechanism: SASL_PLAINTEXT

## @param sasl_plain_username - string - optional
## Username for sasl PLAIN authentication.
#
# sasl_plain_username: username

## @param sasl_plain_password - string - optional
## Password for sasl PLAIN authentication.
#
# sasl_plain_password: password

## @param sasl_kerberos_service_name - string - optional - default:kafka
## Service name to include in GSSAPI sasl mechanism handshake.
#
# sasl_kerberos_service_name: kafka

## @param sasl_kerberos_domain_name - string - optional - default:one of the bootstrap servers
## Kerberos domain name to use in GSSAPI sasl mechanism handshake.
#
# sasl_kerberos_domain_name: localhost

### SSL Configuration ###

## @param ssl_context - string - optional
## Pre-configured SSLContext for wrapping socket connections.
## If provided, all other ssl_* configurations will be ignored.
#
# ssl_context:

## @param ssl_check_hostname - string - optional - default:True
## Flag to configure whether SSL handshake should verify that the
## certificate matches the broker’s hostname.
#
# ssl_check_hostname: True

## @param ssl_cafile - string - optional
## Optional filename of CA file to use in certificate verification.
#
# ssl_cafile: /path/to/pem/file

## @param ssl_certfile - string - optional
## Optional filename of file in PEM format containing the client certificate,
## as well as any CA certificates needed to establish the certificate’s authenticity.
#
# ssl_certfile: /path/to/pem/file

## @param ssl_keyfile - string - optional
## Optional filename containing the client private key.
#
# ssl_keyfile: /path/to/key/file

## @param ssl_password - string - optional
## Optional password to be used when loading the certificate chain.
#
# ssl_password: password1

## @param ssl_crlfile - string - optional
## Optional filename containing the CRL to check for certificate expiration.
## By default, no CRL check is done. When providing a file, only the leaf certificate
## will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
#
# ssl_crlfile:

# kafka_consumer_offsets: false
consumer_groups:
my_consumer: # consumer group name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,16 @@ def _get_kafka_client(self, instance):
cli = KafkaClient(bootstrap_servers=kafka_conn_str,
client_id='dd-agent',
security_protocol=instance.get('security_protocol', 'PLAINTEXT'),
sasl_mechanism=instance.get('sasl_mechanism'),
sasl_plain_username=instance.get('sasl_plain_username'),
sasl_plain_password=instance.get('sasl_plain_password'),
sasl_kerberos_service_name=instance.get('sasl_kerberos_service_name', 'kafka'),
sasl_kerberos_domain_name=instance.get('sasl_kerberos_domain_name'),
ssl_cafile=instance.get('ssl_cafile'),
ssl_check_hostname=instance.get('ssl_check_hostname', True),
ssl_certfile=instance.get('ssl_certfile'),
ssl_keyfile=instance.get('ssl_keyfile'),
ssl_crlfile=instance.get('ssl_crlfile'),
ssl_password=instance.get('ssl_password'))
self.kafka_clients[instance_key] = cli

Expand Down