diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 9cee1b17ef..cc11d2344e 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -22,6 +22,9 @@ import ( "strings" "time" + "knative.dev/eventing/pkg/auth" + "knative.dev/pkg/logging" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -265,7 +268,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - proberAddressable := prober.ProberAddressable{ AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ @@ -282,6 +284,26 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) broker.Status.Address = addressableStatus.Address broker.Status.Addresses = addressableStatus.Addresses + + if feature.FromContext(ctx).IsOIDCAuthentication() && broker.Status.Address != nil { + audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) + logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", audience)) + broker.Status.Address.Audience = &audience + + for i := range broker.Status.Addresses { + broker.Status.Addresses[i].Audience = &audience + } + } else { + logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled") + if broker.Status.Address != nil { + broker.Status.Address.Audience = nil + } + + for i := range broker.Status.Addresses { + broker.Status.Addresses[i].Audience = nil + } + } + broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(base.ConditionAddressable) return nil diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 6c46dd8ee1..08a1e3a1ac 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -23,6 +23,8 @@ import ( "testing" "text/template" + "knative.dev/eventing/pkg/auth" + "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" "k8s.io/apimachinery/pkg/runtime/schema" @@ -106,6 +108,10 @@ var ( linear = eventingduck.BackoffPolicyLinear exponential = eventingduck.BackoffPolicyExponential customBrokerTopicTemplate = customTemplate() + brokerAudience = auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), metav1.ObjectMeta{ + Name: BrokerName, + Namespace: BrokerNamespace, + }) ) var DefaultEnv = &config.Env{ @@ -121,7 +127,6 @@ var DefaultEnv = &config.Env{ func TestBrokerReconciler(t *testing.T) { eventing.RegisterAlternateBrokerConditionSet(base.IngressConditionSet) - t.Parallel() for _, f := range Formats { @@ -2221,6 +2226,105 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }), }, }, + { + Name: "Should provision audience if authentication enabled", + Objects: []runtime.Object{ + NewBroker( + WithBrokerConfig(KReference(BrokerConfig(bootstrapServers, 20, 5, + BrokerAuthConfig("secret-1"), + ))), + ), + NewSSLSecret(ConfigMapNamespace, "secret-1"), + BrokerConfig(bootstrapServers, 20, 5, BrokerAuthConfig("secret-1")), + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewService(), + BrokerReceiverPod(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPod(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + SecretFinalizerUpdate("secret-1", SecretFinalizerName), + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + BootstrapServers: bootstrapServers, + Reference: BrokerReference(), + Auth: &contract.Resource_AuthSecret{ + AuthSecret: &contract.Reference{ + Uuid: SecretUUID, + Namespace: ConfigMapNamespace, + Name: "secret-1", + Version: SecretResourceVersion, + }, + }, + }, + }, + Generation: 1, + }), + BrokerReceiverPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewBroker( + WithBrokerConfig(KReference(BrokerConfig(bootstrapServers, 20, 5, + BrokerAuthConfig("secret-1"), + ))), + reconcilertesting.WithInitBrokerConditions, + StatusBrokerConfigMapUpdatedReady(&env), + StatusBrokerDataPlaneAvailable, + StatusBrokerConfigParsed, + StatusBrokerTopicReady, + BrokerConfigMapAnnotations(), + WithTopicStatusAnnotation(BrokerTopic()), + BrokerConfigMapSecretAnnotation("secret-1"), + BrokerAddressable(&env), + StatusBrokerProbeSucceeded, + WithBrokerAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: brokerAddress, + Audience: &brokerAudience, + }, + }), + WithBrokerAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithBrokerAddessable(), + ), + }, + }, + OtherTestData: map[string]interface{}{ + ExpectedTopicDetail: sarama.TopicDetail{ + NumPartitions: 20, + ReplicationFactor: 5, + }, + }, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + }, } for i := range table {