Skip to content

Commit

Permalink
Merge branch 'knative-extensions:main' into feature/support-expose-au…
Browse files Browse the repository at this point in the history
…dience-of-broker
  • Loading branch information
gunishmatta authored Jan 21, 2024
2 parents e14fab6 + d8e3e86 commit 6083dbf
Show file tree
Hide file tree
Showing 72 changed files with 13,387 additions and 8,530 deletions.
7 changes: 1 addition & 6 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,8 @@ aliases:
- lberk
- matzew
eventing-rabbitmq-approvers:
- andrew-su
- chunyilyu
- gabo1208
- joeeltgroth
- salaboy
- xtreme-sameer-vohra
- tcnghia
eventing-redis-approvers:
- aavarghese
- lionelvillard
Expand Down Expand Up @@ -255,7 +251,6 @@ aliases:
- yuzisun
serving-reviewers:
- KauzClay
- jsanin-vmw
- krsna-m
- retocode
- skonto
Expand Down
10 changes: 10 additions & 0 deletions control-plane/pkg/reconciler/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ func GetLabels(name string) map[string]string {

// Need to have an empty definition here to ensure that we can delete older sources which had a finalizer
func (r Reconciler) FinalizeKind(ctx context.Context, ks *sources.KafkaSource) reconciler.Event {
cg, err := r.ConsumerGroupLister.ConsumerGroups(ks.GetNamespace()).Get(string(ks.UID))
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get ConsumerGroup %s/%s: %w", ks.GetNamespace(), string(ks.UID), err)
}
if apierrors.IsNotFound(err) {
return nil
}

propagateConsumerGroupStatus(cg, ks)

return nil
}

Expand Down
7 changes: 6 additions & 1 deletion control-plane/pkg/reconciler/testing/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ func NewFactory(env *config.Env, ctor Ctor) Factory {
return func(t *testing.T, row *TableRow) (pkgcontroller.Reconciler, ActionRecorderList, EventList) {

listers := newListers(row.Objects)
ctx := context.Background()
var ctx context.Context
if row.Ctx != nil {
ctx = row.Ctx
} else {
ctx = context.Background()
}

ctx, eventingClient := fakeeventingclient.With(ctx, listers.GetEventingObjects()...)
ctx, eventingKafkaBrokerClient := fakeeventingkafkabrokerclient.With(ctx, listers.GetEventingKafkaBrokerObjects()...)
Expand Down
6 changes: 5 additions & 1 deletion control-plane/pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,10 @@ func useTableWithFlags(t *testing.T, table TableTest, env *config.Env, flags fea
table.Test(t, NewFactory(env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {

logger := logging.FromContext(ctx)
ctxFlags := feature.FromContextOrDefaults(ctx)
for k, v := range flags {
ctxFlags[k] = v
}

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
Expand All @@ -2963,7 +2967,7 @@ func useTableWithFlags(t *testing.T, table TableTest, env *config.Env, flags fea
ReceiverLabel: base.BrokerReceiverLabel,
},
FlagsHolder: &FlagsHolder{
Flags: flags,
Flags: ctxFlags,
},
BrokerLister: listers.GetBrokerLister(),
ConfigMapLister: listers.GetConfigMapLister(),
Expand Down
2 changes: 1 addition & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ Lists of 230 third-party dependencies.
(The Apache License, Version 2.0) org.apiguardian:apiguardian-api (org.apiguardian:apiguardian-api:1.1.2 - https://github.com/apiguardian-team/apiguardian)
(Apache License, Version 2.0) AssertJ fluent assertions (org.assertj:assertj-core:3.22.0 - https://assertj.github.io/doc/assertj-core/)
(Apache 2.0) Awaitility (org.awaitility:awaitility:4.2.0 - http://awaitility.org)
(The Apache Software License, Version 2.0) jose4j (org.bitbucket.b_c:jose4j:0.7.9 - https://bitbucket.org/b_c/jose4j/)
(The Apache Software License, Version 2.0) jose4j (org.bitbucket.b_c:jose4j:0.9.4 - https://bitbucket.org/b_c/jose4j/)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.34.0 - https://checkerframework.org/)
(Apache License, Version 2.0) MicroProfile Config API (org.eclipse.microprofile.config:microprofile-config-api:3.0.3 - https://microprofile.io/project/eclipse/microprofile-config/microprofile-config-api)
(Apache License, Version 2.0) MicroProfile Context Propagation (org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api:1.3 - http://microprofile.io/microprofile-context-propagation-api)
Expand Down
5 changes: 5 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.keys.resolvers.JwksVerificationKeyResolver;
import org.jose4j.lang.JoseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OIDCDiscoveryConfig {

private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class);

private String issuer;

private JwksVerificationKeyResolver jwksVerificationKeyResolver;

private OIDCDiscoveryConfig() {}

public String getIssuer() {
return issuer;
}

public JwksVerificationKeyResolver getJwksVerificationKeyResolver() {
return jwksVerificationKeyResolver;
}

public static Future<OIDCDiscoveryConfig> build(Vertx vertx) {
Config kubeConfig = new ConfigBuilder().build();

WebClientOptions webClientOptions = new WebClientOptions()
.setPemTrustOptions(new PemTrustOptions().addCertPath(kubeConfig.getCaCertFile()));
WebClient webClient = WebClient.create(vertx, webClientOptions);

OIDCDiscoveryConfig oidcDiscoveryConfig = new OIDCDiscoveryConfig();

return webClient
.getAbs("https://kubernetes.default.svc/.well-known/openid-configuration")
.bearerTokenAuthentication(kubeConfig.getAutoOAuthToken())
.send()
.compose(res -> {
logger.debug("Got raw OIDC discovery info: " + res.bodyAsString());

try {
ObjectMapper mapper = new ObjectMapper();
OIDCInfo oidcInfo = mapper.readValue(res.bodyAsString(), OIDCInfo.class);

oidcDiscoveryConfig.issuer = oidcInfo.getIssuer();

return webClient
.getAbs(oidcInfo.getJwks().toString())
.bearerTokenAuthentication(kubeConfig.getAutoOAuthToken())
.send();

} catch (JsonProcessingException e) {
logger.error("Failed to parse OIDC discovery info", e);

return Future.failedFuture(e);
}
})
.compose(res -> {
if (res.statusCode() >= 200 && res.statusCode() < 300) {
try {
JsonWebKeySet jsonWebKeySet = new JsonWebKeySet(res.bodyAsString());
logger.debug("Got JWKeys: " + jsonWebKeySet.toJson());

oidcDiscoveryConfig.jwksVerificationKeyResolver =
new JwksVerificationKeyResolver(jsonWebKeySet.getJsonWebKeys());

return Future.succeededFuture(oidcDiscoveryConfig);
} catch (JoseException t) {
logger.error("Failed to parse JWKeys", t);

return Future.failedFuture(t);
}
}

logger.error("Got unexpected response code for JWKey URL: " + res.statusCode());

return Future.failedFuture("unexpected response code on JWKeys URL: " + res.statusCode());
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.net.URL;

@JsonIgnoreProperties(ignoreUnknown = true)
class OIDCInfo {

private String issuer;

@JsonProperty("jwks_uri")
private URL jwks;

public String getIssuer() {
return issuer;
}

public void setIssuer(String issuer) {
this.issuer = issuer;
}

public URL getJwks() {
return jwks;
}

public void setJwks(URL jwks) {
this.jwks = jwks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import org.jose4j.jwt.JwtClaims;
import org.jose4j.jwt.consumer.InvalidJwtException;
import org.jose4j.jwt.consumer.JwtConsumer;
import org.jose4j.jwt.consumer.JwtConsumerBuilder;
import org.jose4j.jwt.consumer.JwtContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TokenVerifier {

private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class);

private final Vertx vertx;

private final OIDCDiscoveryConfig oidcDiscoveryConfig;

public TokenVerifier(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) {
this.vertx = vertx;
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public Future<JwtClaims> verify(String token, String expectedAudience) {
return this.vertx.<JwtClaims>executeBlocking(promise -> {
// execute blocking, as jose .process() is blocking

JwtConsumer jwtConsumer = new JwtConsumerBuilder()
.setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver())
.setExpectedAudience(expectedAudience)
.setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer())
.build();

try {
JwtContext jwtContext = jwtConsumer.process(token);

promise.complete(jwtContext.getJwtClaims());
} catch (InvalidJwtException e) {
promise.fail(e);
}
});
}

public Future<JwtClaims> verify(HttpServerRequest request, String expectedAudience) {
String authHeader = request.getHeader("Authorization");
if (authHeader.isEmpty()) {
return Future.failedFuture("Request didn't contain Authorization header"); // change to exception
}

if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) {
return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception
}

String token = authHeader.substring("Bearer ".length());
logger.debug("Extracted token \"{}\" from request auth header \"{}\"", token, authHeader);

return verify(token, expectedAudience);
}
}
7 changes: 7 additions & 0 deletions data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
<antlr.version>4.9.2
</antlr.version> <!-- Overwritting quarkus's antlr version. Reminder: antlr4-maven-plugin,antlr4-runtime, antlr4 need to have the same version -->
<palantirJavaFormat.version>2.38.0</palantirJavaFormat.version>
<jose4j.version>0.9.4</jose4j.version>
</properties>

<modules>
Expand Down Expand Up @@ -242,6 +243,12 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
<version>${jose4j.version}</version>
</dependency>

<!-- Logback -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Loading

0 comments on commit 6083dbf

Please sign in to comment.