Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Kerberos authentication for Kafka #762

Merged
merged 58 commits into from
Nov 13, 2024
Merged
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3710b4b
added enum for either vector of autentication classes or kerberos sec…
adwk67 Sep 11, 2024
b98c155
initial kerberos impl
adwk67 Sep 13, 2024
471403d
implement kerberos specifics
adwk67 Sep 13, 2024
d90564b
wip: integration test
adwk67 Sep 19, 2024
ab47e06
revert complex enum and use parallel struct (CRD decision pending)
adwk67 Sep 20, 2024
4a855c2
call shell explicitly for kerberos probe to allow variable substitution
adwk67 Sep 20, 2024
6048906
working test
adwk67 Sep 20, 2024
35183ea
revert class name change and formatting
adwk67 Sep 20, 2024
f2dd950
added validation
adwk67 Sep 20, 2024
e5724f0
Merge branch 'main' into feat/kafka-kerberos
adwk67 Sep 20, 2024
33b3ab9
linting
adwk67 Sep 20, 2024
6bcff94
more linting
adwk67 Sep 20, 2024
e43e4fb
refactor: add kerberos to list of authentication classes instead of d…
adwk67 Sep 27, 2024
61f17c0
changelog
adwk67 Sep 27, 2024
d0c9158
reverted operator-rs ref and corrected test
adwk67 Sep 30, 2024
d0ea277
fixed changes due to operator-rs to 0.78.0
adwk67 Sep 30, 2024
eb405a9
added docs/example
adwk67 Sep 30, 2024
ed07e4e
Merge branch 'main' into feat/kafka-kerberos
adwk67 Sep 30, 2024
259fa72
improved comments
adwk67 Sep 30, 2024
13fcdc2
fixed merge conflicts
adwk67 Oct 8, 2024
edd2286
Update rust/crd/src/authentication.rs
adwk67 Oct 8, 2024
89a88db
Update rust/crd/src/lib.rs
adwk67 Oct 8, 2024
fcf34b1
Update rust/operator-binary/src/kafka_controller.rs
adwk67 Oct 8, 2024
5633a52
Update rust/operator-binary/src/kafka_controller.rs
adwk67 Oct 8, 2024
fddcd8d
Merge branch 'feat/kafka-kerberos' of github.com:stackabletech/kafka-…
adwk67 Oct 8, 2024
c2f3be0
fixed review suggestions
adwk67 Oct 8, 2024
a88146c
formatting: new lines between enum elements
adwk67 Oct 8, 2024
c32b828
review suggestions
adwk67 Oct 8, 2024
ad7aee6
add use-client-tls dimension and cleanup test
adwk67 Oct 8, 2024
ef84d73
add constants for kerberos paths
adwk67 Oct 8, 2024
2e20d5a
test: Update kerberos tests to always use TLS
sbernauer Oct 8, 2024
ed0a7df
added check that TLS is enabled for Kerberos
adwk67 Oct 8, 2024
0ad749d
regenerate charts
adwk67 Oct 8, 2024
d0c333a
formatting
adwk67 Oct 8, 2024
8aeb270
corrected validation check
adwk67 Oct 8, 2024
602b891
Update rust/operator-binary/src/kerberos.rs
adwk67 Oct 9, 2024
0da29f4
use listener volume scope for kerberos volume and replace FQDN with l…
adwk67 Oct 9, 2024
de75567
Merge branch 'main' into feat/kafka-kerberos
adwk67 Oct 9, 2024
7be6257
added custom iamge usage from previous merge from main
adwk67 Oct 9, 2024
3aa923a
Update rust/crd/src/authentication.rs
adwk67 Oct 16, 2024
203b52d
remove unecessary test
adwk67 Oct 16, 2024
5b6b7de
removed unused Error
adwk67 Oct 16, 2024
ed30069
regenerate charts
adwk67 Oct 16, 2024
2d4feac
combine cases where internal tls is required
adwk67 Oct 16, 2024
cb8301d
working test with broker listeners instead of listener bootstrap
adwk67 Oct 23, 2024
40ea435
merge main and fix conflicts
adwk67 Oct 23, 2024
c852a1b
add listener for bootstrapper
adwk67 Nov 5, 2024
b8aa750
Merge branch 'main' into feat/kafka-kerberos
adwk67 Nov 5, 2024
57b7ffc
removed duplicate check
adwk67 Nov 5, 2024
344ddf7
add bootstrap configs for client_auth as well
adwk67 Nov 6, 2024
2131479
Merge branch 'main' into feat/kafka-kerberos
adwk67 Nov 7, 2024
109ad3c
use correct port in discovery for kerberos. Removed bootstrap changes…
adwk67 Nov 7, 2024
6ced9a5
use discovery in kerberos test
adwk67 Nov 7, 2024
7526455
Merge branch 'main' into feat/kafka-kerberos
adwk67 Nov 7, 2024
b6f3c60
added unit test for kerberos config
adwk67 Nov 7, 2024
c9336ed
added note about client connections and ports
adwk67 Nov 11, 2024
20fccb9
Update docs/modules/kafka/pages/usage-guide/security.adoc
adwk67 Nov 12, 2024
74f2cb8
clarified comment
adwk67 Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement kerberos specifics
adwk67 committed Sep 13, 2024

Verified

This commit was signed with the committer’s verified signature.
adwk67 Andrew Kenworthy
commit 471403dd5be46031b386ddc89f3b4175307f0617
38 changes: 38 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -285,6 +285,23 @@ impl KafkaCluster {
tracing::debug!("Merged config: {:?}", conf_role_group);
fragment::validate(conf_role_group).context(FragmentValidationFailureSnafu)
}

pub fn has_kerberos_enabled(&self) -> bool {
self.kerberos_secret_class().is_some()
}

pub fn kerberos_secret_class(&self) -> Option<String> {
if let Some(authentication) = self.spec.cluster_config.authentication.clone() {
match authentication {
KafkaAuthenticationMethod::AuthenticationClasses(_) => None,
KafkaAuthenticationMethod::KerberosAuthentication(kerberos_authentication) => {
Some(kerberos_authentication.kerberos.secret_class)
}
}
} else {
None
}
}
}

/// Reference to a single `Pod` that is a component of a [`KafkaCluster`]
@@ -345,6 +362,12 @@ impl KafkaRole {
}
roles
}

/// A Kerberos principal has three parts, with the form username/fully.qualified.domain.name@YOUR-REALM.COM.
/// We only have one role and will use "kafka" everywhere (which e.g. differs from the current hdfs implementation).
pub fn kerberos_service_name(&self) -> &'static str {
"kafka"
}
}

#[derive(Clone, Debug, Default, PartialEq, Fragment, JsonSchema)]
@@ -495,6 +518,21 @@ impl Configuration for KafkaConfigFragment {
Some("true".to_string()),
);
}
// Kerberos
if resource.has_kerberos_enabled() {
config.insert(
"sasl.enabled.mechanisms".to_string(),
Some("GSSAPI".to_string()),
);
config.insert(
"sasl.kerberos.service.name".to_string(),
Some(KafkaRole::Broker.kerberos_service_name().to_string()),
);
config.insert(
"sasl.mechanism.inter.broker.protocol".to_string(),
Some("GSSAPI".to_string()),
);
}
}

Ok(config)
61 changes: 47 additions & 14 deletions rust/crd/src/listener.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,9 @@ pub enum KafkaListenerProtocol {
/// Encrypted and server-authenticated HTTPS connections
#[strum(serialize = "SSL")]
Ssl,
/// Kerberos authentication
#[strum(serialize = "SASL_SSL")]
SaslSsl,
}

#[derive(strum::Display, Debug, EnumString, Ord, Eq, PartialEq, PartialOrd)]
@@ -90,13 +93,13 @@ impl Display for KafkaListener {
pub fn get_kafka_listener_config(
kafka: &KafkaCluster,
kafka_security: &KafkaTlsSecurity,
object_name: &str,
pod_fqdn: &String,
) -> Result<KafkaListenerConfig, KafkaListenerError> {
let pod_fqdn = pod_fqdn(kafka, object_name)?;
let mut listeners = vec![];
let mut advertised_listeners = vec![];
let mut listener_security_protocol_map = BTreeMap::new();

// CLIENT
if kafka_security.tls_client_authentication_class().is_some() {
// 1) If client authentication required, we expose only CLIENT_AUTH connection with SSL
listeners.push(KafkaListener {
@@ -125,8 +128,22 @@ pub fn get_kafka_listener_config(
});
listener_security_protocol_map
.insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl);
} else if kafka.has_kerberos_enabled() {
// 3) Kerberos and TLS authentication classes are mutually exclusive
listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: KafkaTlsSecurity::SECURE_CLIENT_PORT.to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: pod_fqdn.clone(),
port: KafkaTlsSecurity::SECURE_CLIENT_PORT.to_string(),
});
listener_security_protocol_map
.insert(KafkaListenerName::Client, KafkaListenerProtocol::SaslSsl);
} else {
// 3) If no client auth or tls is required we expose CLIENT with PLAINTEXT
// 4) If no client auth or tls is required we expose CLIENT with PLAINTEXT
listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: LISTENER_LOCAL_ADDRESS.to_string(),
@@ -141,30 +158,45 @@ pub fn get_kafka_listener_config(
.insert(KafkaListenerName::Client, KafkaListenerProtocol::Plaintext);
}

// INTERNAL
if kafka_security.tls_internal_secret_class().is_some() {
// 4) If internal tls is required we expose INTERNAL as SSL
// 5) If internal tls is required we expose INTERNAL as SSL
listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: kafka_security.internal_port().to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: pod_fqdn,
host: pod_fqdn.to_string(),
port: kafka_security.internal_port().to_string(),
});
listener_security_protocol_map
.insert(KafkaListenerName::Internal, KafkaListenerProtocol::Ssl);
} else if kafka.has_kerberos_enabled() {
// 6) Kerberos and TLS authentication classes are mutually exclusive
listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: pod_fqdn.to_string(),
port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(),
});
listener_security_protocol_map
.insert(KafkaListenerName::Internal, KafkaListenerProtocol::Ssl);
} else {
// 5) If no internal tls is required we expose INTERNAL as PLAINTEXT
// 7) If no internal tls is required we expose INTERNAL as PLAINTEXT
listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: kafka_security.internal_port().to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: pod_fqdn,
host: pod_fqdn.to_string(),
port: kafka_security.internal_port().to_string(),
});
listener_security_protocol_map.insert(
@@ -184,7 +216,7 @@ fn node_port_cmd(directory: &str, port_name: &str) -> String {
format!("$(cat {directory}/{port_name}_nodeport)")
}

fn pod_fqdn(kafka: &KafkaCluster, object_name: &str) -> Result<String, KafkaListenerError> {
pub fn pod_fqdn(kafka: &KafkaCluster, object_name: &str) -> Result<String, KafkaListenerError> {
Ok(format!(
"$POD_NAME.{}.{}.svc.cluster.local",
object_name,
@@ -240,7 +272,8 @@ mod tests {
"internalTls".to_string(),
Some("tls".to_string()),
);
let config = get_kafka_listener_config(&kafka, &kafka_security, object_name).unwrap();
let pod_fqdn = pod_fqdn(&kafka, object_name).unwrap();
let config = get_kafka_listener_config(&kafka, &kafka_security, &pod_fqdn).unwrap();

assert_eq!(
config.listeners(),
@@ -263,7 +296,7 @@ mod tests {
host = LISTENER_NODE_ADDRESS,
port = node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name).unwrap(),
internal_host = &pod_fqdn,
internal_port = kafka_security.internal_port(),
)
);
@@ -300,7 +333,7 @@ mod tests {
"tls".to_string(),
Some("tls".to_string()),
);
let config = get_kafka_listener_config(&kafka, &kafka_security, object_name).unwrap();
let config = get_kafka_listener_config(&kafka, &kafka_security, &pod_fqdn).unwrap();

assert_eq!(
config.listeners(),
@@ -323,7 +356,7 @@ mod tests {
host = LISTENER_NODE_ADDRESS,
port = node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name).unwrap(),
internal_host = &pod_fqdn,
internal_port = kafka_security.internal_port(),
)
);
@@ -362,7 +395,7 @@ mod tests {
"".to_string(),
None,
);
let config = get_kafka_listener_config(&kafka, &kafka_security, object_name).unwrap();
let config = get_kafka_listener_config(&kafka, &kafka_security, &pod_fqdn).unwrap();

assert_eq!(
config.listeners(),
@@ -385,7 +418,7 @@ mod tests {
host = LISTENER_NODE_ADDRESS,
port = node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name).unwrap(),
internal_host = &pod_fqdn,
internal_port = kafka_security.internal_port(),
)
);
20 changes: 17 additions & 3 deletions rust/crd/src/security.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,8 @@ use stackable_operator::{
};

use crate::{
authentication::KafkaAuthenticationClass, KafkaAuthenticationMethod, STACKABLE_LOG_DIR,
authentication::KafkaAuthenticationClass, KafkaAuthenticationMethod, KafkaRole,
STACKABLE_LOG_DIR,
};
use crate::{
authentication::{self, ResolvedAuthenticationClasses},
@@ -260,26 +261,39 @@ impl<'a> KafkaTlsSecurity<'a> {
&self,
kafka_listeners: &KafkaListenerConfig,
opa_connect_string: Option<&str>,
kerberos_enabled: bool,
pod_fqdn: &String,
) -> Vec<String> {
vec![formatdoc! {"
{COMMON_BASH_TRAP_FUNCTIONS}
{remove_vector_shutdown_file_command}
prepare_signal_handlers
bin/kafka-server-start.sh {STACKABLE_CONFIG_DIR}/{SERVER_PROPERTIES_FILE} --override \"zookeeper.connect=$ZOOKEEPER\" --override \"listeners={listeners}\" --override \"advertised.listeners={advertised_listeners}\" --override \"listener.security.protocol.map={listener_security_protocol_map}\"{opa_config} &
{set_realm_env}
bin/kafka-server-start.sh {STACKABLE_CONFIG_DIR}/{SERVER_PROPERTIES_FILE} --override \"zookeeper.connect=$ZOOKEEPER\" --override \"listeners={listeners}\" --override \"advertised.listeners={advertised_listeners}\" --override \"listener.security.protocol.map={listener_security_protocol_map}\"{opa_config}{jaas_config} &
wait_for_termination $!
{create_vector_shutdown_file_command}
",
remove_vector_shutdown_file_command =
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
create_vector_shutdown_file_command =
create_vector_shutdown_file_command(STACKABLE_LOG_DIR),
set_realm_env = match kerberos_enabled {
true => "KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' /stackable/kerberos/krb5.conf)",
false => "",
},
listeners = kafka_listeners.listeners(),
advertised_listeners = kafka_listeners.advertised_listeners(),
listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(),
opa_config = match opa_connect_string {
None => "".to_string(),
Some(opa_connect_string) => format!(" --override \"opa.authorizer.url={opa_connect_string}\""),
}
},
jaas_config = match kerberos_enabled {
true => {
let service_name = KafkaRole::Broker.kerberos_service_name();
format!(" --override \"listener.name.client.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/stackable/kerberos/keytab\" principal=\"{service_name}/{pod_fqdn}@$KERBEROS_REALM\";\"")},
false => "".to_string(),
},
}]
}

46 changes: 36 additions & 10 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
@@ -12,10 +12,11 @@ use product_config::{
};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_kafka_crd::{
listener::get_kafka_listener_config, security::KafkaTlsSecurity, Container, KafkaCluster,
KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME, DOCKER_IMAGE_BASE_NAME,
JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, LOG_DIRS_VOLUME_NAME, METRICS_PORT,
METRICS_PORT_NAME, OPERATOR_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR,
listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError},
security::KafkaTlsSecurity,
Container, KafkaCluster, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME,
DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, LOG_DIRS_VOLUME_NAME,
METRICS_PORT, METRICS_PORT_NAME, OPERATOR_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR,
STACKABLE_DATA_DIR, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, STACKABLE_TMP_DIR,
};
use stackable_operator::{
@@ -71,6 +72,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};

use crate::{
discovery::{self, build_discovery_configmaps},
kerberos::{self, add_kerberos_pod_config},
operations::{
graceful_shutdown::{add_graceful_shutdown_config, graceful_shutdown_config_properties},
pdb::add_pdbs,
@@ -308,6 +310,12 @@ pub enum Error {
AddVolumesAndVolumeMounts {
source: stackable_kafka_crd::security::Error,
},

#[snafu(display("failed to resolve the fully-qualified pod name"))]
ResolveNamespace { source: KafkaListenerError },

#[snafu(display("failed to add kerberos config"))]
AddKerberosConfig { source: kerberos::Error },
}
type Result<T, E = Error> = std::result::Result<T, E>;

@@ -365,6 +373,8 @@ impl ReconcilerError for Error {
Error::MetadataBuild { .. } => None,
Error::LabelBuild { .. } => None,
Error::AddVolumesAndVolumeMounts { .. } => None,
Error::ResolveNamespace { .. } => None,
Error::AddKerberosConfig { .. } => None,
}
}
}
@@ -749,7 +759,7 @@ fn build_broker_rolegroup_service(
#[allow(clippy::too_many_arguments)]
fn build_broker_rolegroup_statefulset(
kafka: &KafkaCluster,
role: &KafkaRole,
kafka_role: &KafkaRole,
resolved_product_image: &ResolvedProductImage,
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
broker_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
@@ -758,7 +768,7 @@ fn build_broker_rolegroup_statefulset(
merged_config: &KafkaConfig,
sa_name: &str,
) -> Result<StatefulSet> {
let role = kafka.role(role).context(InternalOperatorSnafu)?;
let role = kafka.role(kafka_role).context(InternalOperatorSnafu)?;
let rolegroup = kafka
.rolegroup(rolegroup_ref)
.context(InternalOperatorSnafu)?;
@@ -788,6 +798,17 @@ fn build_broker_rolegroup_statefulset(
.add_volume_and_volume_mounts(&mut pod_builder, &mut cb_kcat_prober, &mut cb_kafka)
.context(AddVolumesAndVolumeMountsSnafu)?;

if kafka.has_kerberos_enabled() {
add_kerberos_pod_config(
kafka,
kafka_role,
&mut cb_kcat_prober,
&mut cb_kafka,
&mut pod_builder,
)
.context(AddKerberosConfigSnafu)?;
}

cb_get_svc
.image_from_product_image(resolved_product_image)
.command(vec![
@@ -882,9 +903,9 @@ fn build_broker_rolegroup_statefulset(
let jvm_args = format!(
"-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE} -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml",
);
let kafka_listeners =
get_kafka_listener_config(kafka, kafka_security, &rolegroup_ref.object_name())
.context(InvalidKafkaListenersSnafu)?;
let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name()).context(ResolveNamespaceSnafu)?;
let kafka_listeners = get_kafka_listener_config(kafka, kafka_security, &pod_fqdn)
.context(InvalidKafkaListenersSnafu)?;

cb_kafka
.image_from_product_image(resolved_product_image)
@@ -896,7 +917,12 @@ fn build_broker_rolegroup_statefulset(
"-c".to_string(),
])
.args(vec![kafka_security
.kafka_container_commands(&kafka_listeners, opa_connect_string)
.kafka_container_commands(
&kafka_listeners,
opa_connect_string,
kafka.has_kerberos_enabled(),
&pod_fqdn,
)
.join("\n")])
.add_env_var("EXTRA_ARGS", jvm_args)
.add_env_var(
52 changes: 52 additions & 0 deletions rust/operator-binary/src/kerberos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use snafu::{ResultExt, Snafu};
use stackable_kafka_crd::{KafkaCluster, KafkaRole};
use stackable_operator::{
builder::pod::{
container::ContainerBuilder,
volume::{
SecretOperatorVolumeSourceBuilder, SecretOperatorVolumeSourceBuilderError,
VolumeBuilder,
},
PodBuilder,
},
kube::ResourceExt,
};

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("failed to add Kerberos secret volume"))]
AddKerberosSecretVolume {
source: SecretOperatorVolumeSourceBuilderError,
},
}

pub fn add_kerberos_pod_config(
kafka: &KafkaCluster,
role: &KafkaRole,
cb_kcat_prober: &mut ContainerBuilder,
cb_kafka: &mut ContainerBuilder,
pb: &mut PodBuilder,
) -> Result<(), Error> {
if let Some(kerberos_secret_class) = kafka.kerberos_secret_class() {
// Mount keytab
let kerberos_secret_operator_volume =
SecretOperatorVolumeSourceBuilder::new(kerberos_secret_class)
.with_service_scope(kafka.name_any())
.with_pod_scope()
.with_kerberos_service_name(role.kerberos_service_name())
.build()
.context(AddKerberosSecretVolumeSnafu)?;
pb.add_volume(
VolumeBuilder::new("kerberos")
.ephemeral(kerberos_secret_operator_volume)
.build(),
);
cb_kcat_prober.add_volume_mount("kerberos", "/stackable/kerberos");
cb_kcat_prober.add_env_var("KRB5_CONFIG", "/stackable/kerberos/krb5.conf");

cb_kafka.add_volume_mount("kerberos", "/stackable/kerberos");
cb_kafka.add_env_var("KRB5_CONFIG", "/stackable/kerberos/krb5.conf");
}

Ok(())
}
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ use crate::{

mod discovery;
mod kafka_controller;
mod kerberos;
mod operations;
mod pod_svc_controller;
mod product_logging;