diff --git a/build.gradle b/build.gradle
index e3259a8df342e1..2984812bda13b8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -39,13 +39,13 @@ buildscript {
ext.springBootVersion = '3.2.9'
ext.springKafkaVersion = '3.1.6'
ext.openTelemetryVersion = '1.18.0'
- ext.neo4jVersion = '5.14.0'
- ext.neo4jTestVersion = '5.14.0'
- ext.neo4jApocVersion = '5.14.0'
+ ext.neo4jVersion = '5.20.0'
+ ext.neo4jTestVersion = '5.20.0'
+ ext.neo4jApocVersion = '5.20.0'
ext.testContainersVersion = '1.17.4'
ext.elasticsearchVersion = '2.11.1' // ES 7.10, Opensearch 1.x, 2.x
ext.jacksonVersion = '2.15.3'
- ext.jettyVersion = '11.0.21'
+ ext.jettyVersion = '12.0.16'
// see also datahub-frontend/play.gradle
ext.playVersion = '2.8.22'
ext.playScalaVersion = '2.13'
@@ -136,7 +136,8 @@ project.ext.externalDependency = [
'datastaxOssNativeProtocol': 'com.datastax.oss:native-protocol:1.5.1',
'datastaxOssCore': 'com.datastax.oss:java-driver-core:4.14.1',
'datastaxOssQueryBuilder': 'com.datastax.oss:java-driver-query-builder:4.14.1',
- 'dgraph4j' : 'io.dgraph:dgraph4j:21.12.0',
+ 'dgraph4j' : 'io.dgraph:dgraph4j:24.1.1',
+ 'dgraphNetty': 'io.grpc:grpc-netty-shaded:1.69.0',
'dropwizardMetricsCore': 'io.dropwizard.metrics:metrics-core:4.2.3',
'dropwizardMetricsJmx': 'io.dropwizard.metrics:metrics-jmx:4.2.3',
'ebean': 'io.ebean:ebean:' + ebeanVersion,
@@ -176,8 +177,9 @@ project.ext.externalDependency = [
'jakartaValidation': 'jakarta.validation:jakarta.validation-api:3.1.0-M2',
'jerseyCore': 'org.glassfish.jersey.core:jersey-client:2.41',
'jerseyGuava': 'org.glassfish.jersey.bundles.repackaged:jersey-guava:2.25.1',
- 'jettyJaas': "org.eclipse.jetty:jetty-jaas:$jettyVersion",
+ 'jettySecurity': "org.eclipse.jetty:jetty-security:$jettyVersion",
'jettyClient': "org.eclipse.jetty:jetty-client:$jettyVersion",
+ 'jettyJmx': "org.eclipse.jetty:jetty-jmx:$jettyVersion",
'jettison': 'org.codehaus.jettison:jettison:1.5.4',
'jgrapht': 'org.jgrapht:jgrapht-core:1.5.1',
'jna': 'net.java.dev.jna:jna:5.12.1',
@@ -380,6 +382,13 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) {
resolutionStrategy.force externalDependency.antlr4Runtime
resolutionStrategy.force externalDependency.antlr4
resolutionStrategy.force 'org.apache.mina:mina-core:2.2.4'
+ resolutionStrategy {
+ force "org.eclipse.jetty:jetty-security:${jettyVersion}"
+ force "org.eclipse.jetty:jetty-server:${jettyVersion}"
+ force "org.eclipse.jetty:jetty-ee10-servlet:${jettyVersion}"
+ force "org.eclipse.jetty:jetty-ee10-webapp:${jettyVersion}"
+ force "org.eclipse.jetty:jetty-xml:${jettyVersion}"
+ }
}
}
@@ -407,7 +416,7 @@ subprojects {
implementation externalDependency.annotationApi
constraints {
implementation("com.google.googlejavaformat:google-java-format:$googleJavaFormatVersion")
- implementation('io.netty:netty-all:4.1.115.Final')
+ implementation('io.netty:netty-all:4.1.116.Final')
implementation('org.apache.commons:commons-compress:1.27.1')
implementation('org.apache.velocity:velocity-engine-core:2.4')
implementation('org.hibernate:hibernate-validator:6.0.20.Final')
diff --git a/datahub-frontend/app/security/AuthenticationManager.java b/datahub-frontend/app/security/AuthenticationManager.java
index f46dc57c232bd2..8e7d51a0776c23 100644
--- a/datahub-frontend/app/security/AuthenticationManager.java
+++ b/datahub-frontend/app/security/AuthenticationManager.java
@@ -1,68 +1,33 @@
package security;
import com.google.common.base.Preconditions;
-import java.util.Collections;
import javax.annotation.Nonnull;
import javax.naming.AuthenticationException;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
import org.apache.commons.lang3.StringUtils;
-import org.eclipse.jetty.jaas.JAASLoginService;
-import org.eclipse.jetty.jaas.PropertyUserStoreManager;
-import play.Logger;
+import org.eclipse.jetty.security.UserPrincipal;
+import org.eclipse.jetty.util.security.Credential;
public class AuthenticationManager {
-
- private AuthenticationManager(boolean verbose) {}
+ private AuthenticationManager() {} // Prevent instantiation
public static void authenticateJaasUser(@Nonnull String userName, @Nonnull String password)
throws Exception {
Preconditions.checkArgument(!StringUtils.isAnyEmpty(userName), "Username cannot be empty");
- JAASLoginService jaasLoginService = new JAASLoginService("WHZ-Authentication");
- PropertyUserStoreManager propertyUserStoreManager = new PropertyUserStoreManager();
- propertyUserStoreManager.start();
- jaasLoginService.setBeans(Collections.singletonList(propertyUserStoreManager));
- JAASLoginService.INSTANCE.set(jaasLoginService);
- try {
- LoginContext lc =
- new LoginContext("WHZ-Authentication", new WHZCallbackHandler(userName, password));
- lc.login();
- } catch (LoginException le) {
- AuthenticationException authenticationException =
- new AuthenticationException(le.getMessage());
- authenticationException.setRootCause(le);
- throw authenticationException;
- }
- }
- private static class WHZCallbackHandler implements CallbackHandler {
- private String password;
- private String username;
-
- private WHZCallbackHandler(@Nonnull String username, @Nonnull String password) {
- this.username = username;
- this.password = password;
- }
+ try {
+ // Create and configure credentials for authentication
+ UserPrincipal userPrincipal = new UserPrincipal(userName, Credential.getCredential(password));
- @Override
- public void handle(@Nonnull Callback[] callbacks) {
- NameCallback nc = null;
- PasswordCallback pc = null;
- for (Callback callback : callbacks) {
- Logger.debug(
- "The submitted callback is of type: " + callback.getClass() + " : " + callback);
- if (callback instanceof NameCallback) {
- nc = (NameCallback) callback;
- nc.setName(this.username);
- } else if (callback instanceof PasswordCallback) {
- pc = (PasswordCallback) callback;
- pc.setPassword(this.password.toCharArray());
- }
+ // Verify credentials
+ if (!userPrincipal.authenticate(password)) {
+ throw new AuthenticationException("Invalid credentials for user: " + userName);
}
+
+ } catch (Exception e) {
+ AuthenticationException authenticationException =
+ new AuthenticationException("Authentication failed");
+ authenticationException.setRootCause(e);
+ throw authenticationException;
}
}
}
diff --git a/datahub-frontend/play.gradle b/datahub-frontend/play.gradle
index d513c3c232d9a0..1a9ffeede56251 100644
--- a/datahub-frontend/play.gradle
+++ b/datahub-frontend/play.gradle
@@ -50,7 +50,7 @@ dependencies {
implementation externalDependency.springBeans
implementation externalDependency.springContext
implementation externalDependency.springBootAutoconfigure
- implementation externalDependency.jettyJaas
+ implementation externalDependency.jettySecurity
implementation externalDependency.graphqlJava
implementation externalDependency.antlr4Runtime
implementation externalDependency.antlr4
diff --git a/docker/datahub-gms/Dockerfile b/docker/datahub-gms/Dockerfile
index 52cc507f9268d1..5462d4f70002c1 100644
--- a/docker/datahub-gms/Dockerfile
+++ b/docker/datahub-gms/Dockerfile
@@ -26,7 +26,6 @@ RUN go install github.com/jwilder/dockerize@$DOCKERIZE_VERSION
FROM alpine:3.21 AS base
ENV JMX_VERSION=0.18.0
-ENV JETTY_VERSION=11.0.21
# Re-declaring args from above to make them available in this stage (will inherit default values)
ARG ALPINE_REPO_URL
@@ -42,9 +41,6 @@ RUN apk --no-cache --update-cache --available upgrade \
&& apk --no-cache add curl bash coreutils gcompat sqlite libc6-compat snappy \
&& apk --no-cache add openjdk17-jre-headless --repository=${ALPINE_REPO_URL}/edge/community \
&& apk --no-cache add jattach --repository ${ALPINE_REPO_URL}/edge/community/ \
- && curl -sS ${MAVEN_CENTRAL_REPO_URL}/org/eclipse/jetty/jetty-runner/${JETTY_VERSION}/jetty-runner-${JETTY_VERSION}.jar --output jetty-runner.jar \
- && curl -sS ${MAVEN_CENTRAL_REPO_URL}/org/eclipse/jetty/jetty-jmx/${JETTY_VERSION}/jetty-jmx-${JETTY_VERSION}.jar --output jetty-jmx.jar \
- && curl -sS ${MAVEN_CENTRAL_REPO_URL}/org/eclipse/jetty/jetty-util/${JETTY_VERSION}/jetty-util-${JETTY_VERSION}.jar --output jetty-util.jar \
&& wget --no-verbose ${GITHUB_REPO_URL}/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.24.0/opentelemetry-javaagent.jar \
&& wget --no-verbose ${MAVEN_CENTRAL_REPO_URL}/io/prometheus/jmx/jmx_prometheus_javaagent/${JMX_VERSION}/jmx_prometheus_javaagent-${JMX_VERSION}.jar -O jmx_prometheus_javaagent.jar \
&& cp /usr/lib/jvm/java-17-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
@@ -56,8 +52,6 @@ FROM base AS prod-install
COPY war.war /datahub/datahub-gms/bin/war.war
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-gms/resources/entity-registry.yml
COPY docker/datahub-gms/start.sh /datahub/datahub-gms/scripts/start.sh
-COPY docker/datahub-gms/jetty.xml /datahub/datahub-gms/scripts/jetty.xml
-COPY docker/datahub-gms/jetty-jmx.xml /datahub/datahub-gms/scripts/jetty-jmx.xml
COPY docker/monitoring/client-prometheus-config.yaml /datahub/datahub-gms/scripts/prometheus-config.yaml
RUN chmod +x /datahub/datahub-gms/scripts/start.sh
@@ -70,7 +64,7 @@ FROM ${APP_ENV}-install AS final
RUN mkdir -p /etc/datahub/plugins/auth/resources
RUN addgroup -S datahub && adduser -S datahub -G datahub
-RUN chown -R datahub:datahub /etc/datahub
+RUN chown -R datahub:datahub /etc/datahub /datahub
USER datahub
ENV JMX_OPTS=""
diff --git a/docker/datahub-gms/jetty-jmx.xml b/docker/datahub-gms/jetty-jmx.xml
deleted file mode 100644
index 5aadbb66a70ed1..00000000000000
--- a/docker/datahub-gms/jetty-jmx.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/docker/datahub-gms/jetty.xml b/docker/datahub-gms/jetty.xml
deleted file mode 100644
index 3f04635d9498ca..00000000000000
--- a/docker/datahub-gms/jetty.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/docker/datahub-gms/start.sh b/docker/datahub-gms/start.sh
index c91580eed83cb9..cac36920022749 100755
--- a/docker/datahub-gms/start.sh
+++ b/docker/datahub-gms/start.sh
@@ -62,13 +62,8 @@ COMMON="
java $JAVA_OPTS $JMX_OPTS \
$OTEL_AGENT \
$PROMETHEUS_AGENT \
- -jar /jetty-runner.jar \
- --stats unsecure \
- --jar jetty-util.jar \
- --jar jetty-jmx.jar \
- --config /datahub/datahub-gms/scripts/jetty.xml \
- --config /datahub/datahub-gms/scripts/jetty-jmx.xml \
- /datahub/datahub-gms/bin/war.war"
+ -Dstats=unsecure \
+ -jar /datahub/datahub-gms/bin/war.war"
if [[ $SKIP_ELASTICSEARCH_CHECK != true ]]; then
exec dockerize \
diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml
index c68a4c1f5a8fcf..0cbb2aee903c84 100644
--- a/docker/docker-compose.dev.yml
+++ b/docker/docker-compose.dev.yml
@@ -49,8 +49,6 @@ services:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
volumes:
- ./datahub-gms/start.sh:/datahub/datahub-gms/scripts/start.sh
- - ./datahub-gms/jetty.xml:/datahub/datahub-gms/scripts/jetty.xml
- - ./datahub-gms/jetty-jmx.xml:/datahub/datahub-gms/scripts/jetty-jmx.xml
- ./monitoring/client-prometheus-config.yaml:/datahub/datahub-gms/scripts/prometheus-config.yaml
- ../metadata-models/src/main/resources/:/datahub/datahub-gms/resources
- ../metadata-service/war/build/libs/:/datahub/datahub-gms/bin
diff --git a/docker/profiles/docker-compose.frontend.yml b/docker/profiles/docker-compose.frontend.yml
index c6b15a7016670d..b278cd41b0b231 100644
--- a/docker/profiles/docker-compose.frontend.yml
+++ b/docker/profiles/docker-compose.frontend.yml
@@ -26,6 +26,7 @@ x-datahub-frontend-service-dev: &datahub-frontend-service-dev
DATAHUB_ANALYTICS_ENABLED: ${DATAHUB_ANALYTICS_ENABLED:-true}
volumes:
- ../../datahub-frontend/build/stage/main:/datahub-frontend
+ - ./monitoring/client-prometheus-config.yaml:/datahub-frontend/client-prometheus-config.yaml
services:
frontend-quickstart:
diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml
index 2147d6b5a0247f..d4ea7dde9f8481 100644
--- a/docker/profiles/docker-compose.gms.yml
+++ b/docker/profiles/docker-compose.gms.yml
@@ -130,15 +130,13 @@ x-datahub-gms-service-dev: &datahub-gms-service-dev
<<: [*datahub-dev-telemetry-env, *datahub-gms-env]
SKIP_ELASTICSEARCH_CHECK: false
JAVA_TOOL_OPTIONS: '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5001'
- BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE: false
+ BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:-false}
SEARCH_SERVICE_ENABLE_CACHE: false
LINEAGE_SEARCH_CACHE_ENABLED: false
SHOW_BROWSE_V2: true
ENTITY_VERSIONING_ENABLED: ${ENTITY_VERSIONING_ENABLED:-true}
volumes:
- ./datahub-gms/start.sh:/datahub/datahub-gms/scripts/start.sh
- - ./datahub-gms/jetty.xml:/datahub/datahub-gms/scripts/jetty.xml
- - ./datahub-gms/jetty-jmx.xml:/datahub/datahub-gms/scripts/jetty-jmx.xml
- ./monitoring/client-prometheus-config.yaml:/datahub/datahub-gms/scripts/prometheus-config.yaml
- ../../metadata-models/src/main/resources/:/datahub/datahub-gms/resources
- ../../metadata-service/war/build/libs/:/datahub/datahub-gms/bin
diff --git a/metadata-integration/java/acryl-spark-lineage/build.gradle b/metadata-integration/java/acryl-spark-lineage/build.gradle
index 8816264fbe50f7..c8b78f25d70992 100644
--- a/metadata-integration/java/acryl-spark-lineage/build.gradle
+++ b/metadata-integration/java/acryl-spark-lineage/build.gradle
@@ -57,7 +57,13 @@ dependencies {
//implementation "io.acryl:datahub-client:0.10.2"
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1"
- compileOnly "org.apache.spark:spark-sql_2.12:3.1.3"
+ compileOnly("org.apache.spark:spark-sql_2.12:3.1.3") {
+ exclude group: 'org.eclipse.jetty', module: 'jetty-servlet'
+ exclude group: 'org.eclipse.jetty', module: 'jetty-server'
+ exclude group: 'org.eclipse.jetty', module: 'jetty-util'
+ exclude group: 'org.eclipse.jetty', module: 'jetty-webapp'
+ exclude group: 'org.eclipse.jetty', module: 'jetty-security'
+ }
compileOnly "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5"
testCompileOnly externalDependency.lombok
diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle
index aab29101b30f71..b33f19bef95986 100644
--- a/metadata-io/build.gradle
+++ b/metadata-io/build.gradle
@@ -37,6 +37,7 @@ dependencies {
exclude group: 'com.google.guava', module: 'guava'
exclude group: 'io.grpc', module: 'grpc-protobuf'
}
+ implementation externalDependency.dgraphNetty
implementation externalDependency.slf4jApi
runtimeOnly externalDependency.logbackClassic
compileOnly externalDependency.lombok
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java
index fa04de340e12f7..b34730f481c63b 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java
@@ -18,11 +18,11 @@ private Neo4jTestServerBuilder(Neo4jBuilder builder) {
}
public Neo4jTestServerBuilder() {
- this(new InProcessNeo4jBuilder().withProcedure(PathExplorer.class));
+ this(new InProcessNeo4jBuilder().withProcedure(PathExplorer.class).withDisabledServer());
}
public Neo4jTestServerBuilder(File workingDirectory) {
- this(new InProcessNeo4jBuilder(workingDirectory.toPath()));
+ this(new InProcessNeo4jBuilder(workingDirectory.toPath()).withDisabledServer());
}
public Neo4j newServer() {
diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java
index ce7376f1f8d662..d699f0bff68019 100644
--- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java
+++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java
@@ -49,7 +49,8 @@ public DataHubUsageEventsProcessor(
@KafkaListener(
id = "${DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID:datahub-usage-event-consumer-job-client}",
topics = "${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}",
- containerFactory = "simpleKafkaConsumer")
+ containerFactory = "simpleKafkaConsumer",
+ autoStartup = "false")
public void consume(final ConsumerRecord consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java
index c909b0034a9125..20c044b42741e8 100644
--- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java
+++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java
@@ -73,7 +73,7 @@ public void afterPropertiesSet() {
buildConsumerGroupName(key),
List.of(mclVersionedTopicName, mclTimeseriesTopicName),
hooks);
- registerMCLKafkaListener(kafkaListenerEndpoint, true);
+ registerMCLKafkaListener(kafkaListenerEndpoint, false);
});
}
@@ -97,7 +97,7 @@ private KafkaListenerEndpoint createListenerEndpoint(
new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(consumerGroupId);
kafkaListenerEndpoint.setGroupId(consumerGroupId);
- kafkaListenerEndpoint.setAutoStartup(true);
+ kafkaListenerEndpoint.setAutoStartup(false);
kafkaListenerEndpoint.setTopics(topics.toArray(new String[topics.size()]));
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
kafkaListenerEndpoint.setBean(
diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java
index 5d2f6452e69197..2152ed15cf0e93 100644
--- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java
+++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java
@@ -62,7 +62,8 @@ public class MetadataChangeEventsProcessor {
"${METADATA_CHANGE_EVENT_NAME:${KAFKA_MCE_TOPIC_NAME:"
+ Topics.METADATA_CHANGE_EVENT
+ "}}",
- containerFactory = DEFAULT_EVENT_CONSUMER_NAME)
+ containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
+ autoStartup = "false")
@Deprecated
public void consume(final ConsumerRecord consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java
index 4e356f5fb3670a..d854a5517793ff 100644
--- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java
+++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java
@@ -80,7 +80,8 @@ public void registerConsumerThrottle() {
@KafkaListener(
id = CONSUMER_GROUP_ID_VALUE,
topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}",
- containerFactory = MCP_EVENT_CONSUMER_NAME)
+ containerFactory = MCP_EVENT_CONSUMER_NAME,
+ autoStartup = "false")
public void consume(final ConsumerRecord consumerRecord) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java
index fed93628fe4d79..5ee9cd6ba94d2f 100644
--- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java
+++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java
@@ -73,7 +73,8 @@ public void registerConsumerThrottle() {
id = CONSUMER_GROUP_ID_VALUE,
topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}",
containerFactory = "kafkaEventConsumer",
- batch = "true")
+ batch = "true",
+ autoStartup = "false")
public void consume(final List> consumerRecords) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
List metadataChangeProposals =
diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java
index 5d11697bed93d2..2befeccb951a38 100644
--- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java
+++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java
@@ -58,7 +58,8 @@ public PlatformEventProcessor(
@KafkaListener(
id = "${PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID:generic-platform-event-job-client}",
topics = {"${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}"},
- containerFactory = PE_EVENT_CONSUMER_NAME)
+ containerFactory = PE_EVENT_CONSUMER_NAME,
+ autoStartup = "false")
public void consume(final ConsumerRecord consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
diff --git a/metadata-service/auth-config/src/main/java/com/datahub/authentication/AuthenticationConfiguration.java b/metadata-service/auth-config/src/main/java/com/datahub/authentication/AuthenticationConfiguration.java
index 81cc5e60552a77..d258fcfeb65750 100644
--- a/metadata-service/auth-config/src/main/java/com/datahub/authentication/AuthenticationConfiguration.java
+++ b/metadata-service/auth-config/src/main/java/com/datahub/authentication/AuthenticationConfiguration.java
@@ -12,6 +12,9 @@ public class AuthenticationConfiguration {
/** Whether user existence is enforced */
private boolean enforceExistenceEnabled;
+ /** Paths to be excluded from filtering * */
+ private String excludedPaths;
+
/**
* List of configurations for {@link com.datahub.plugins.auth.authentication.Authenticator}s to be
* registered
diff --git a/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java b/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java
index 30f98180f80180..492e165c0781a0 100644
--- a/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java
+++ b/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java
@@ -25,38 +25,43 @@
import com.datahub.plugins.factory.PluginConfigFactory;
import com.datahub.plugins.loader.IsolatedClassLoader;
import com.datahub.plugins.loader.PluginPermissionManagerImpl;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.entity.EntityService;
import jakarta.inject.Named;
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
-import jakarta.servlet.FilterConfig;
import jakarta.servlet.ServletException;
-import jakarta.servlet.ServletRequest;
-import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.web.context.support.SpringBeanAutowiringSupport;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import org.springframework.web.filter.OncePerRequestFilter;
/**
* A servlet {@link Filter} for authenticating requests inbound to the Metadata Service. This filter
* is applied to the GraphQL Servlet, the Rest.li Servlet, and the Auth (token) Servlet.
*/
+@Component
@Slf4j
-public class AuthenticationFilter implements Filter {
+public class AuthenticationFilter extends OncePerRequestFilter {
@Autowired private ConfigurationProvider configurationProvider;
@@ -72,18 +77,32 @@ public class AuthenticationFilter implements Filter {
private boolean _logAuthenticatorExceptions;
private AuthenticatorChain authenticatorChain;
+ private Set excludedPathPatterns;
- @Override
- public void init(FilterConfig filterConfig) throws ServletException {
- SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
+ @PostConstruct
+ public void init() {
buildAuthenticatorChain();
+ initializeExcludedPaths();
log.info("AuthenticationFilter initialized.");
}
+ private void initializeExcludedPaths() {
+ excludedPathPatterns = new HashSet<>();
+ String excludedPaths = configurationProvider.getAuthentication().getExcludedPaths();
+ if (StringUtils.hasText(excludedPaths)) {
+ excludedPathPatterns.addAll(
+ Arrays.stream(excludedPaths.split(","))
+ .map(String::trim)
+ .filter(path -> !path.isBlank())
+ .toList());
+ }
+ }
+
@Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
- throws IOException, ServletException {
- AuthenticationRequest context = buildAuthContext((HttpServletRequest) request);
+ protected void doFilterInternal(
+ HttpServletRequest request, HttpServletResponse response, FilterChain chain)
+ throws ServletException, IOException {
+ AuthenticationRequest context = buildAuthContext(request);
Authentication authentication = null;
try {
authentication = this.authenticatorChain.authenticate(context, _logAuthenticatorExceptions);
@@ -119,6 +138,34 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
AuthenticationContext.remove();
}
+ @VisibleForTesting
+ @Override
+ public boolean shouldNotFilter(HttpServletRequest request) {
+ String path = request.getServletPath();
+ if (path == null) {
+ return false;
+ }
+
+ // Check if the path matches any of the excluded patterns
+ boolean shouldExclude =
+ excludedPathPatterns.stream()
+ .anyMatch(
+ pattern -> {
+ if (pattern.endsWith("/*")) {
+ // Handle wildcard patterns
+ String basePattern = pattern.substring(0, pattern.length() - 2);
+ return path.startsWith(basePattern);
+ }
+ return path.equals(pattern);
+ });
+
+ if (shouldExclude) {
+ log.debug("Skipping authentication for excluded path: {}", path);
+ }
+
+ return shouldExclude;
+ }
+
@Override
public void destroy() {
// Nothing
diff --git a/metadata-service/auth-filter/src/test/java/com/datahub/auth/authentication/AuthenticationFilterTest.java b/metadata-service/auth-filter/src/test/java/com/datahub/auth/authentication/AuthenticationFilterTest.java
index 382e881542b0b8..0ffabb9db305dd 100644
--- a/metadata-service/auth-filter/src/test/java/com/datahub/auth/authentication/AuthenticationFilterTest.java
+++ b/metadata-service/auth-filter/src/test/java/com/datahub/auth/authentication/AuthenticationFilterTest.java
@@ -2,6 +2,8 @@
import static com.datahub.authentication.AuthenticationConstants.*;
import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
import com.datahub.auth.authentication.filter.AuthenticationFilter;
import com.datahub.authentication.Actor;
@@ -9,27 +11,34 @@
import com.datahub.authentication.token.StatefulTokenService;
import com.datahub.authentication.token.TokenException;
import jakarta.servlet.FilterChain;
+import jakarta.servlet.FilterConfig;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
+import org.springframework.test.util.ReflectionTestUtils;
import org.testng.annotations.Test;
@ContextConfiguration(classes = {AuthTestConfiguration.class})
public class AuthenticationFilterTest extends AbstractTestNGSpringContextTests {
- @Autowired AuthenticationFilter _authenticationFilter;
+ @Autowired AuthenticationFilter authenticationFilter;
- @Autowired StatefulTokenService _statefulTokenService;
+ @Autowired StatefulTokenService statefulTokenService;
@Test
public void testExpiredToken() throws ServletException, IOException, TokenException {
- _authenticationFilter.init(null);
+ FilterConfig mockFilterConfig = mock(FilterConfig.class);
+ when(mockFilterConfig.getInitParameterNames()).thenReturn(Collections.emptyEnumeration());
+
+ authenticationFilter.init(mockFilterConfig);
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
HttpServletResponse servletResponse = mock(HttpServletResponse.class);
FilterChain filterChain = mock(FilterChain.class);
@@ -46,8 +55,47 @@ public void testExpiredToken() throws ServletException, IOException, TokenExcept
.thenReturn(Collections.enumeration(List.of(AUTHORIZATION_HEADER_NAME)));
when(servletRequest.getHeader(AUTHORIZATION_HEADER_NAME)).thenReturn("Bearer " + token);
- _authenticationFilter.doFilter(servletRequest, servletResponse, filterChain);
+ authenticationFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse, times(1))
.sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString());
}
+
+ @Test
+ public void testExcludedPaths() throws ServletException {
+ // Mock configuration setup
+ FilterConfig mockFilterConfig = mock(FilterConfig.class);
+ when(mockFilterConfig.getInitParameterNames()).thenReturn(Collections.emptyEnumeration());
+ authenticationFilter.init(mockFilterConfig);
+
+ // Test cases for different path patterns
+ HttpServletRequest exactPathRequest = mock(HttpServletRequest.class);
+ when(exactPathRequest.getServletPath()).thenReturn("/health");
+
+ HttpServletRequest wildcardPathRequest = mock(HttpServletRequest.class);
+ when(wildcardPathRequest.getServletPath()).thenReturn("/schema-registry/api/config");
+
+ HttpServletRequest nonExcludedRequest = mock(HttpServletRequest.class);
+ when(nonExcludedRequest.getServletPath()).thenReturn("/protected/resource");
+
+ // Set excluded paths in the filter
+ ReflectionTestUtils.setField(
+ authenticationFilter,
+ "excludedPathPatterns",
+ new HashSet<>(Arrays.asList("/health", "/schema-registry/*")));
+
+ // Verify exact path match
+ assertTrue(
+ authenticationFilter.shouldNotFilter(exactPathRequest),
+ "Exact path match should be excluded from filtering");
+
+ // Verify wildcard path match
+ assertTrue(
+ authenticationFilter.shouldNotFilter(wildcardPathRequest),
+ "Path matching wildcard pattern should be excluded from filtering");
+
+ // Verify non-excluded path
+ assertFalse(
+ authenticationFilter.shouldNotFilter(nonExcludedRequest),
+ "Non-excluded path should not be excluded from filtering");
+ }
}
diff --git a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java
index 5d4542cf0826e8..1f86c56049190d 100644
--- a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java
+++ b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java
@@ -35,11 +35,13 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.HttpClientErrorException;
@Slf4j
@RestController
+@RequestMapping("/auth")
public class AuthServiceController {
private static final String USER_ID_FIELD_NAME = "userId";
diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml
index 69b86962442b91..04f7409d5c39a3 100644
--- a/metadata-service/configuration/src/main/resources/application.yaml
+++ b/metadata-service/configuration/src/main/resources/application.yaml
@@ -5,6 +5,7 @@ baseUrl: ${DATAHUB_BASE_URL:http://localhost:9002}
authentication:
# Enable if you want all requests to the Metadata Service to be authenticated.
enabled: ${METADATA_SERVICE_AUTH_ENABLED:true}
+ excludedPaths: /schema-registry/*,/health,/config,/config/search/export
# Disable if you want to skip validation of deleted user's tokens
enforceExistenceEnabled: ${METADATA_SERVICE_AUTH_ENFORCE_EXISTENCE_ENABLED:true}
@@ -330,16 +331,34 @@ neo4j:
connectionLivenessCheckTimeout: ${NEO4J_CONNECTION_LIVENESS_CHECK_TIMEOUT_IN_SECONDS:-1}
spring:
+ error:
+ include-message: never
+ include-stacktrace: never
+ include-exception: false
+ whitelabel:
+ enabled: false
+ jmx:
+ enabled: true
+ web:
+ resources:
+ add-mappings: false # do not serve static files
mvc:
servlet:
- path: /openapi
+ path: /
+ throw-exception-if-no-handler-found: true # throw exception on 404 to be handled
kafka:
security:
protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:PLAINTEXT}
springdoc:
cache:
- disabled: true
+ disabled: false
+ swagger-ui:
+ path: /openapi/swagger-ui/index.html
+ api-docs:
+ path: /openapi/v3/api-docs
+ groups:
+ enabled: true
metadataTests:
enabled: ${METADATA_TESTS_ENABLED:false}
@@ -563,8 +582,6 @@ graphQL:
depthLimit: ${GRAPHQL_QUERY_DEPTH_LIMIT:50}
introspectionEnabled: ${GRAPHQL_QUERY_INTROSPECTION_ENABLED:true}
-springdoc.api-docs.groups.enabled: true
-
forms:
hook:
enabled: ${FORMS_HOOK_ENABLED:true}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java
index a1ee4df360b7ec..625623d008e127 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java
@@ -172,6 +172,7 @@ private KafkaListenerContainerFactory> buildDefaultKafkaListenerContainerFacto
factory.setConsumerFactory(factoryWithOverrides);
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConcurrency(kafkaEventConsumerConcurrency);
+ factory.setAutoStartup(false);
/* Sets up a delegating error handler for Deserialization errors, if disabled will
use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container
@@ -202,6 +203,7 @@ protected KafkaListenerContainerFactory> duheKafkaEventConsumer(
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConcurrency(1);
+ factory.setAutoStartup(false);
log.info(
"Event-based DUHE KafkaListenerContainerFactory built successfully. Consumer concurrency = 1");
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java
index 0193ded97f81b5..acab78dcfd5f52 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java
@@ -53,6 +53,7 @@ protected KafkaListenerContainerFactory> createInstance(
new ConcurrentKafkaListenerContainerFactory<>();
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(customizedProperties));
+ factory.setAutoStartup(false);
log.info("Simple KafkaListenerContainerFactory built successfully");
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/common/KafkaInitializationManager.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/common/KafkaInitializationManager.java
new file mode 100644
index 00000000000000..dcc818d82ee560
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/common/KafkaInitializationManager.java
@@ -0,0 +1,52 @@
+package com.linkedin.gms.factory.kafka.common;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.availability.AvailabilityChangeEvent;
+import org.springframework.boot.availability.ReadinessState;
+import org.springframework.context.event.EventListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class KafkaInitializationManager {
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final KafkaListenerEndpointRegistry registry;
+
+ @Autowired
+ public KafkaInitializationManager(KafkaListenerEndpointRegistry registry) {
+ this.registry = registry;
+ log.info(
+ "Created KafkaInitializationManager. Waiting for application to be ready to enable kafka consumers.");
+ }
+
+ @EventListener
+ public void onStateChange(AvailabilityChangeEvent event) {
+ if (event.getState() == ReadinessState.ACCEPTING_TRAFFIC) {
+ initialize(this.getClass().getSimpleName());
+ }
+ }
+
+ public void initialize(String initializerName) {
+ if (isInitialized.compareAndSet(false, true)) {
+ int containerCount = registry.getAllListenerContainers().size();
+ log.info("Starting {} kafka consumers. Initialized by {}", containerCount, initializerName);
+ registry
+ .getAllListenerContainers()
+ .forEach(
+ container -> {
+ if (!container.isRunning()) {
+ container.start();
+ log.info("Started container: {}", container.getListenerId());
+ }
+ });
+ log.info("All {} kafka containers started.", containerCount);
+ }
+ }
+
+ public boolean isInitialized() {
+ return isInitialized.get();
+ }
+}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java
index 921246fa98f7a2..2d599f340f7588 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java
@@ -1,22 +1,11 @@
package com.linkedin.metadata.boot;
-import com.linkedin.gms.factory.config.ConfigurationProvider;
-import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
+import com.linkedin.gms.factory.kafka.common.KafkaInitializationManager;
import io.datahubproject.metadata.context.OperationContext;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@@ -27,98 +16,39 @@
@Component
public class OnBootApplicationListener {
- public static final String SCHEMA_REGISTRY_SERVLET_NAME = "dispatcher-schema-registry";
-
- private static final Set ACCEPTED_HTTP_CODES =
- Set.of(
- HttpStatus.SC_OK,
- HttpStatus.SC_MOVED_PERMANENTLY,
- HttpStatus.SC_MOVED_TEMPORARILY,
- HttpStatus.SC_FORBIDDEN,
- HttpStatus.SC_UNAUTHORIZED);
-
- private static final String ROOT_WEB_APPLICATION_CONTEXT_ID =
- String.format("%s:", WebApplicationContext.class.getName());
-
- private final CloseableHttpClient httpClient = HttpClients.createDefault();
-
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
-
@Autowired
@Qualifier("bootstrapManager")
private BootstrapManager _bootstrapManager;
- @Autowired
- @Qualifier("configurationProvider")
- private ConfigurationProvider provider;
-
- @Value("${bootstrap.servlets.waitTimeout}")
- private int _servletsWaitTimeout;
-
@Autowired
@Qualifier("systemOperationContext")
private OperationContext systemOperationContext;
+ @Autowired private KafkaInitializationManager kafkaInitializationManager;
+
@EventListener(ContextRefreshedEvent.class)
public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) {
+ String contextId = event.getApplicationContext().getId();
+ log.info("Context refreshed for ID: {}", contextId);
- if (SCHEMA_REGISTRY_SERVLET_NAME.equals(event.getApplicationContext().getId())) {
- log.info("Loading servlet {} without interruption.", SCHEMA_REGISTRY_SERVLET_NAME);
- return;
- }
-
- log.warn(
- "OnBootApplicationListener context refreshed! {} event: {}",
- ROOT_WEB_APPLICATION_CONTEXT_ID.equals(event.getApplicationContext().getId()),
- event);
- String schemaRegistryType = provider.getKafka().getSchemaRegistry().getType();
- if (ROOT_WEB_APPLICATION_CONTEXT_ID.equals(event.getApplicationContext().getId())) {
+ // For the root application context
+ if (event.getApplicationContext() instanceof WebApplicationContext) {
+ log.info("Root WebApplicationContext initialized, starting bootstrap process");
- // Handle race condition, if ebean code is executed while waiting/bootstrapping (i.e.
- // AuthenticationFilter)
+ // Initialize Ebean first
try {
Class.forName("io.ebean.XServiceProvider");
} catch (ClassNotFoundException e) {
- log.error(
- "Failure to initialize required class `io.ebean.XServiceProvider` during initialization.");
+ log.error("Failed to initialize io.ebean.XServiceProvider", e);
throw new RuntimeException(e);
}
- if (InternalSchemaRegistryFactory.TYPE.equals(schemaRegistryType)) {
- executorService.submit(isSchemaRegistryAPIServletReady());
- } else {
- _bootstrapManager.start(systemOperationContext);
- }
- }
- }
+ // Initialize consumers
+ kafkaInitializationManager.initialize(this.getClass().getSimpleName());
- public Runnable isSchemaRegistryAPIServletReady() {
- return () -> {
- final HttpGet request = new HttpGet(provider.getKafka().getSchemaRegistry().getUrl());
- int timeouts = _servletsWaitTimeout;
- boolean openAPIServletReady = false;
- while (!openAPIServletReady && timeouts > 0) {
- try {
- log.info("Sleeping for 1 second");
- Thread.sleep(1000);
- StatusLine statusLine = httpClient.execute(request).getStatusLine();
- if (ACCEPTED_HTTP_CODES.contains(statusLine.getStatusCode())) {
- log.info("Connected! Authentication not tested.");
- openAPIServletReady = true;
- }
- } catch (IOException | InterruptedException e) {
- log.info("Failed to connect to open servlet: {}", e.getMessage());
- }
- timeouts--;
- }
- if (!openAPIServletReady) {
- log.error(
- "Failed to bootstrap DataHub, OpenAPI servlet was not ready after {} seconds",
- timeouts);
- System.exit(1);
- } else {
- _bootstrapManager.start(systemOperationContext);
- }
- };
+ _bootstrapManager.start(systemOperationContext);
+ } else {
+ log.debug("Ignoring non-web application context refresh");
+ }
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java
index e69ab342740e43..50be0149ce2d4e 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java
@@ -97,7 +97,8 @@ public void onPartitionsAssigned(
id = CONSUMER_GROUP,
topics = {TOPIC_NAME},
containerFactory = "duheKafkaEventConsumer",
- concurrency = "1")
+ concurrency = "1",
+ autoStartup = "false")
public void checkSystemVersion(final ConsumerRecord consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "checkSystemVersion").time()) {
final GenericRecord record = consumerRecord.value();
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java
index c63d71475c2fc8..a41553379b1faa 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java
@@ -78,7 +78,7 @@ public void execute(@Nonnull OperationContext systemOperationContext)
// 1. Read from the file into JSON.
JsonNode defaultSettingsObj;
try {
- defaultSettingsObj = mapper.readTree(new ClassPathResource(_resourcePath).getFile());
+ defaultSettingsObj = mapper.readTree(new ClassPathResource(_resourcePath).getInputStream());
} catch (Exception e) {
throw new RuntimeException(
String.format(
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java
index dac2879487469c..04d73895802a8d 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java
@@ -74,7 +74,7 @@ public void execute(@Nonnull OperationContext systemOperationContext)
log.info("Ingesting default access policies from: {}...", _policiesResource);
// 1. Read from the file into JSON.
- final JsonNode policiesObj = mapper.readTree(_policiesResource.getFile());
+ final JsonNode policiesObj = mapper.readTree(_policiesResource.getInputStream());
if (!policiesObj.isArray()) {
throw new RuntimeException(
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRetentionPoliciesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRetentionPoliciesStep.java
index b5ca0dee142df3..ac986bdf04827d 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRetentionPoliciesStep.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRetentionPoliciesStep.java
@@ -15,7 +15,6 @@
import com.linkedin.retention.DataHubRetentionConfig;
import io.datahubproject.metadata.context.OperationContext;
import jakarta.annotation.Nonnull;
-import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collections;
@@ -23,7 +22,9 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
+import org.springframework.core.io.support.ResourcePatternResolver;
@Slf4j
@RequiredArgsConstructor
@@ -77,12 +78,21 @@ public void execute(@Nonnull OperationContext systemOperationContext)
return;
}
- // 1. Read default retention config
- final Map retentionPolicyMap =
- parseFileOrDir(new ClassPathResource("./boot/retention.yaml").getFile());
+ ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
- // 2. Read plugin retention config files from input path and overlay
- retentionPolicyMap.putAll(parseFileOrDir(new File(pluginPath)));
+ // 1. Read default retention config from classpath
+ Resource defaultResource = resolver.getResource("classpath:boot/retention.yaml");
+ Map retentionPolicyMap =
+ parseYamlRetentionConfig(defaultResource);
+
+ // 2. Read plugin retention config files from filesystem path
+ if (!pluginPath.isEmpty()) {
+ String pattern = "file:" + pluginPath + "/**/*.{yaml,yml}";
+ Resource[] resources = resolver.getResources(pattern);
+ for (Resource resource : resources) {
+ retentionPolicyMap.putAll(parseYamlRetentionConfig(resource));
+ }
+ }
// 4. Set the specified retention policies
log.info("Setting {} policies", retentionPolicyMap.size());
@@ -106,39 +116,6 @@ public void execute(@Nonnull OperationContext systemOperationContext)
BootstrapStep.setUpgradeResult(systemOperationContext, UPGRADE_ID_URN, _entityService);
}
- // Parse input yaml file or yaml files in the input directory to generate a retention policy map
- private Map parseFileOrDir(File retentionFileOrDir)
- throws IOException {
- // If path does not exist return empty
- if (!retentionFileOrDir.exists()) {
- return Collections.emptyMap();
- }
-
- // If directory, parse the yaml files under the directory
- if (retentionFileOrDir.isDirectory()) {
- Map result = new HashMap<>();
-
- for (File retentionFile : retentionFileOrDir.listFiles()) {
- if (!retentionFile.isFile()) {
- log.info(
- "Element {} in plugin directory {} is not a file. Skipping",
- retentionFile.getPath(),
- retentionFileOrDir.getPath());
- continue;
- }
- result.putAll(parseFileOrDir(retentionFile));
- }
- return result;
- }
- // If file, parse the yaml file and return result;
- if (!retentionFileOrDir.getPath().endsWith(".yaml")
- && retentionFileOrDir.getPath().endsWith(".yml")) {
- log.info("File {} is not a YAML file. Skipping", retentionFileOrDir.getPath());
- return Collections.emptyMap();
- }
- return parseYamlRetentionConfig(retentionFileOrDir);
- }
-
/**
* Parse yaml retention config
*
@@ -147,8 +124,11 @@ private Map parseFileOrDir(File ret
* converted into the {@link com.linkedin.retention.DataHubRetentionConfig} class.
*/
private Map parseYamlRetentionConfig(
- File retentionConfigFile) throws IOException {
- final JsonNode retentionPolicies = YAML_MAPPER.readTree(retentionConfigFile);
+ Resource resource) throws IOException {
+ if (!resource.exists()) {
+ return Collections.emptyMap();
+ }
+ final JsonNode retentionPolicies = YAML_MAPPER.readTree(resource.getInputStream());
if (!retentionPolicies.isArray()) {
throw new IllegalArgumentException(
"Retention config file must contain an array of retention policies");
diff --git a/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/AbstractJakartaR2Servlet.java b/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/AbstractJakartaR2Servlet.java
index 8d589c4ab2408b..9c4117202ed570 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/AbstractJakartaR2Servlet.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/AbstractJakartaR2Servlet.java
@@ -1,35 +1,9 @@
-/*
- Copyright (c) 2012 LinkedIn Corp.
- Copyright (c) 2023 Acryl Data, Inc.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
- MODIFICATIONS:
- Changed javax packages to jakarta for Spring Boot 3 support
-*/
-
-/* $Id$ */
package com.linkedin.r2.transport.http.server;
import com.linkedin.data.ByteString;
import com.linkedin.r2.message.RequestContext;
-import com.linkedin.r2.message.rest.RestException;
-import com.linkedin.r2.message.rest.RestRequest;
-import com.linkedin.r2.message.rest.RestRequestBuilder;
-import com.linkedin.r2.message.rest.RestResponse;
-import com.linkedin.r2.message.rest.RestStatus;
+import com.linkedin.r2.message.rest.*;
import com.linkedin.r2.transport.common.WireAttributeHelper;
-import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.common.HttpConstants;
@@ -40,210 +14,177 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author Steven Ihde
- * @author Chris Pettitt
- * @author Fatih Emekci
- * @version $Revision$
- */
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public abstract class AbstractJakartaR2Servlet extends HttpServlet {
- private static final Logger _log = LoggerFactory.getLogger(AbstractJakartaR2Servlet.class);
- private static final long serialVersionUID = 0L;
+ private static final long serialVersionUID = 1L;
- // servlet timeout in ms.
- protected final long _timeout;
+ private final Duration timeout;
protected abstract HttpDispatcher getDispatcher();
- public AbstractJakartaR2Servlet(long timeout) {
- _timeout = timeout;
+ protected AbstractJakartaR2Servlet(Duration timeout) {
+ this.timeout = timeout;
}
@Override
- protected void service(final HttpServletRequest req, final HttpServletResponse resp)
+ protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- RequestContext requestContext = JakartaServletHelper.readRequestContext(req);
-
- RestRequest restRequest;
-
try {
- restRequest = readFromServletRequest(req);
- } catch (URISyntaxException e) {
- writeToServletError(resp, RestStatus.BAD_REQUEST, e.toString());
- return;
- }
+ RequestContext requestContext = JakartaServletHelper.readRequestContext(req);
+ RestRequest restRequest = createRestRequest(req);
- final AtomicReference> result = new AtomicReference<>();
- final CountDownLatch latch = new CountDownLatch(1);
+ CompletableFuture> future = new CompletableFuture<>();
- TransportCallback callback =
- new TransportCallback() {
- @Override
- public void onResponse(TransportResponse response) {
- result.set(response);
- latch.countDown();
- }
- };
+ getDispatcher().handleRequest(restRequest, requestContext, future::complete);
- getDispatcher().handleRequest(restRequest, requestContext, callback);
+ TransportResponse result =
+ future
+ .orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
+ .exceptionally(this::handleException)
+ .join();
- try {
- if (latch.await(_timeout, TimeUnit.MILLISECONDS)) {
- writeToServletResponse(result.get(), resp);
- } else {
- writeToServletError(
- resp, RestStatus.INTERNAL_SERVER_ERROR, "Server Timeout after " + _timeout + "ms.");
- }
- } catch (InterruptedException e) {
- throw new ServletException("Interrupted!", e);
- }
- }
+ writeResponse(result, resp);
- protected void writeToServletResponse(
- TransportResponse response, HttpServletResponse resp) throws IOException {
- Map wireAttrs = response.getWireAttributes();
- for (Map.Entry e : WireAttributeHelper.toWireAttributes(wireAttrs).entrySet()) {
- resp.setHeader(e.getKey(), e.getValue());
- }
-
- RestResponse restResponse = null;
- if (response.hasError()) {
- Throwable e = response.getError();
- if (e instanceof RestException) {
- restResponse = ((RestException) e).getResponse();
- }
- if (restResponse == null) {
- restResponse = RestStatus.responseForError(RestStatus.INTERNAL_SERVER_ERROR, e);
- }
- } else {
- restResponse = response.getResponse();
- }
-
- resp.setStatus(restResponse.getStatus());
- Map headers = restResponse.getHeaders();
- for (Map.Entry e : headers.entrySet()) {
- // TODO multi-valued headers
- resp.setHeader(e.getKey(), e.getValue());
+ } catch (URISyntaxException e) {
+ writeError(resp, RestStatus.BAD_REQUEST, "Invalid URI: " + e.getMessage());
+ } catch (Exception e) {
+ log.error("Unexpected error processing request", e);
+ writeError(resp, RestStatus.INTERNAL_SERVER_ERROR, "Internal server error");
}
+ }
- for (String cookie : restResponse.getCookies()) {
- resp.addHeader(HttpConstants.RESPONSE_COOKIE_HEADER_NAME, cookie);
+ private TransportResponse handleException(Throwable ex) {
+ if (ex instanceof TimeoutException) {
+ RestResponse errorResponse =
+ RestStatus.responseForError(
+ RestStatus.INTERNAL_SERVER_ERROR,
+ new RuntimeException("Server timeout after " + timeout.toSeconds() + " seconds"));
+ return TransportResponseImpl.error(new RestException(errorResponse));
}
+ return TransportResponseImpl.error(ex);
+ }
- final ByteString entity = restResponse.getEntity();
- entity.write(resp.getOutputStream());
-
- resp.getOutputStream().close();
+ private RestRequest createRestRequest(HttpServletRequest req)
+ throws IOException, ServletException, URISyntaxException {
+ String pathInfo = extractPathInfo(req);
+ String queryString = Optional.ofNullable(req.getQueryString()).map(q -> "?" + q).orElse("");
+
+ URI uri = new URI(pathInfo + queryString);
+
+ RestRequestBuilder builder = new RestRequestBuilder(uri).setMethod(req.getMethod());
+
+ // Handle headers
+ Collections.list(req.getHeaderNames())
+ .forEach(
+ headerName -> {
+ if (headerName.equalsIgnoreCase(HttpConstants.REQUEST_COOKIE_HEADER_NAME)) {
+ Collections.list(req.getHeaders(headerName)).forEach(builder::addCookie);
+ } else {
+ Collections.list(req.getHeaders(headerName))
+ .forEach(value -> builder.addHeaderValue(headerName, value));
+ }
+ });
+
+ // Handle request body
+ int contentLength = req.getContentLength();
+ ByteString entity =
+ (contentLength > 0)
+ ? ByteString.read(req.getInputStream(), contentLength)
+ : ByteString.read(req.getInputStream());
+
+ builder.setEntity(entity);
+
+ return builder.build();
}
- protected void writeToServletError(HttpServletResponse resp, int statusCode, String message)
+ private void writeResponse(TransportResponse response, HttpServletResponse resp)
throws IOException {
- RestResponse restResponse = RestStatus.responseForStatus(statusCode, message);
- writeToServletResponse(TransportResponseImpl.success(restResponse), resp);
- }
+ // Write wire attributes
+ WireAttributeHelper.toWireAttributes(response.getWireAttributes()).forEach(resp::setHeader);
+
+ // Get response or create error response
+ RestResponse restResponse =
+ Optional.of(response)
+ .filter(TransportResponse::hasError)
+ .map(
+ r -> {
+ Throwable error = r.getError();
+ if (error instanceof RestException) {
+ return ((RestException) error).getResponse();
+ }
+ return RestStatus.responseForError(RestStatus.INTERNAL_SERVER_ERROR, error);
+ })
+ .orElseGet(response::getResponse);
+
+ // Write status and headers
+ resp.setStatus(restResponse.getStatus());
+ restResponse.getHeaders().forEach(resp::setHeader);
- protected RestRequest readFromServletRequest(HttpServletRequest req)
- throws IOException, ServletException, URISyntaxException {
- StringBuilder sb = new StringBuilder();
- sb.append(extractPathInfo(req));
- String query = req.getQueryString();
- if (query != null) {
- sb.append('?');
- sb.append(query);
- }
+ // Write cookies
+ restResponse
+ .getCookies()
+ .forEach(cookie -> resp.addHeader(HttpConstants.RESPONSE_COOKIE_HEADER_NAME, cookie));
- URI uri = new URI(sb.toString());
-
- RestRequestBuilder rb = new RestRequestBuilder(uri);
- rb.setMethod(req.getMethod());
-
- for (Enumeration headerNames = req.getHeaderNames(); headerNames.hasMoreElements(); ) {
- String headerName = headerNames.nextElement();
- if (headerName.equalsIgnoreCase(HttpConstants.REQUEST_COOKIE_HEADER_NAME)) {
- for (Enumeration cookies = req.getHeaders(headerName);
- cookies.hasMoreElements(); ) {
- rb.addCookie(cookies.nextElement());
- }
- } else {
- for (Enumeration headerValues = req.getHeaders(headerName);
- headerValues.hasMoreElements(); ) {
- rb.addHeaderValue(headerName, headerValues.nextElement());
- }
- }
+ // Write response body
+ try (var outputStream = resp.getOutputStream()) {
+ restResponse.getEntity().write(outputStream);
}
+ }
- int length = req.getContentLength();
- if (length > 0) {
- rb.setEntity(ByteString.read(req.getInputStream(), length));
- } else {
- // Known cases for not sending a content-length header in a request
- // 1. Chunked transfer encoding
- // 2. HTTP/2
- rb.setEntity(ByteString.read(req.getInputStream()));
- }
- return rb.build();
+ private void writeError(HttpServletResponse resp, int statusCode, String message)
+ throws IOException {
+ RestResponse errorResponse = RestStatus.responseForStatus(statusCode, message);
+ writeResponse(TransportResponseImpl.success(errorResponse), resp);
}
- /**
- * Attempts to return a "non decoded" pathInfo by stripping off the contextPath and servletPath
- * parts of the requestURI. As a defensive measure, this method will return the "decoded" pathInfo
- * directly by calling req.getPathInfo() if it is unable to strip off the contextPath or
- * servletPath.
- *
- * @throws javax.servlet.ServletException if resulting pathInfo is empty
- */
protected static String extractPathInfo(HttpServletRequest req) throws ServletException {
- // For "http:hostname:8080/contextPath/servletPath/pathInfo" the RequestURI is
- // "/contextPath/servletPath/pathInfo"
- // where the contextPath, servletPath and pathInfo parts all contain their leading slash.
-
- // stripping contextPath and servletPath this way is not fully compatible with the HTTP spec.
- // If a
- // request for, say "/%75scp-proxy/reso%75rces" is made (where %75 decodes to 'u')
- // the stripping off of contextPath and servletPath will fail because the requestUri string will
- // include the encoded char but the contextPath and servletPath strings will not.
String requestUri = req.getRequestURI();
- String contextPath = req.getContextPath();
- StringBuilder builder = new StringBuilder();
- if (contextPath != null) {
- builder.append(contextPath);
- }
+ String contextPath = Optional.ofNullable(req.getContextPath()).orElse("");
+ String servletPath = Optional.ofNullable(req.getServletPath()).orElse("");
- String servletPath = req.getServletPath();
- if (servletPath != null) {
- builder.append(servletPath);
- }
- String prefix = builder.toString();
- String pathInfo;
- if (prefix.length() == 0) {
+ String prefix = contextPath + servletPath;
+ String pathInfo = null;
+
+ if (prefix.isEmpty()) {
pathInfo = requestUri;
- } else if (requestUri.startsWith(prefix)) {
+ } else if (servletPath.startsWith("/gms") && requestUri.startsWith(prefix)) {
pathInfo = requestUri.substring(prefix.length());
- } else {
- _log.warn(
- "Unable to extract 'non decoded' pathInfo, returning 'decoded' pathInfo instead. This may cause issues processing request URIs containing special characters. requestUri="
- + requestUri);
- return req.getPathInfo();
}
- if (pathInfo.length() == 0) {
- // We prefer to keep servlet mapping trivial with R2 and have R2
- // TransportDispatchers make most of the routing decisions based on the 'pathInfo'
- // and query parameters in the URI.
- // If pathInfo is null, it's highly likely that the servlet was mapped to an exact
- // path or to a file extension, making such R2-based services too reliant on the
- // servlet container for routing
- throw new ServletException(
- "R2 servlet should only be mapped via wildcard path mapping e.g. /r2/*. "
- + "Exact path matching (/r2) and file extension mappings (*.r2) are currently not supported");
+ if (pathInfo == null || pathInfo.isEmpty()) {
+ log.debug(
+ "Previously invalid servlet mapping detected. Request details: method='{}', requestUri='{}', contextPath='{}', "
+ + "servletPath='{}', prefix='{}', pathInfo='{}', queryString='{}', protocol='{}', remoteAddr='{}', "
+ + "serverName='{}', serverPort={}, contentType='{}', characterEncoding='{}'",
+ req.getMethod(),
+ requestUri,
+ contextPath,
+ servletPath,
+ prefix,
+ pathInfo,
+ req.getQueryString(),
+ req.getProtocol(),
+ req.getRemoteAddr(),
+ req.getServerName(),
+ req.getServerPort(),
+ req.getContentType(),
+ req.getCharacterEncoding());
+
+ /* NOTE: Working around what was previously considered an error.
+ * throw new ServletException(
+ * "R2 servlet must be mapped using wildcard path mapping (e.g., /r2/*). " +
+ * "Exact path matching (/r2) and file extension mappings (*.r2) are not supported.");
+ **/
+
+ pathInfo = requestUri;
}
return pathInfo;
diff --git a/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/RAPJakartaServlet.java b/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/RAPJakartaServlet.java
index 75e808d5245c83..f0db3cd01ffcf9 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/RAPJakartaServlet.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/r2/transport/http/server/RAPJakartaServlet.java
@@ -1,44 +1,21 @@
-/*
- Copyright (c) 2012 LinkedIn Corp.
- Copyright (c) 2023 Acryl Data, Inc.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
- MODIFICATIONS:
- Changed javax dependencies to jakarta for Spring Boot 3 support
-*/
-
-/** $Id: $ */
package com.linkedin.r2.transport.http.server;
import com.linkedin.r2.transport.common.bridge.server.TransportDispatcher;
+import java.time.Duration;
+import lombok.Getter;
-/**
- * @author Steven Ihde
- * @version $Revision: $
- */
public class RAPJakartaServlet extends AbstractJakartaR2Servlet {
private static final long serialVersionUID = 0L;
- private final HttpDispatcher _dispatcher;
+ @Getter private final HttpDispatcher dispatcher;
- public RAPJakartaServlet(HttpDispatcher dispatcher) {
- super(Long.MAX_VALUE);
- _dispatcher = dispatcher;
+ public RAPJakartaServlet(HttpDispatcher dispatcher, int timeoutSeconds) {
+ super(Duration.ofSeconds(timeoutSeconds));
+ this.dispatcher = dispatcher;
}
- public RAPJakartaServlet(TransportDispatcher dispatcher) {
- this(HttpDispatcherFactory.create((dispatcher)));
+ public RAPJakartaServlet(TransportDispatcher dispatcher, int timeoutSeconds) {
+ this(HttpDispatcherFactory.create((dispatcher)), timeoutSeconds);
}
/**
@@ -48,22 +25,7 @@ public RAPJakartaServlet(TransportDispatcher dispatcher) {
*/
public RAPJakartaServlet(
HttpDispatcher dispatcher, boolean useContinuations, int timeOut, int timeOutDelta) {
- super(timeOut);
- _dispatcher = dispatcher;
- }
-
- /**
- * Initialize the RAPJakartaServlet.
- *
- * @see #AbstractJakartaR2Servlet
- */
- public RAPJakartaServlet(
- TransportDispatcher dispatcher, boolean useContinuations, int timeOut, int timeOutDelta) {
- this(HttpDispatcherFactory.create((dispatcher)), useContinuations, timeOut, timeOutDelta);
- }
-
- @Override
- protected HttpDispatcher getDispatcher() {
- return _dispatcher;
+ super(Duration.ofSeconds(timeOut));
+ this.dispatcher = dispatcher;
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java
index be060477aeb1f6..73db7ec73f4d7e 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java
@@ -26,6 +26,9 @@ public class RAPServletFactory {
@Value("#{systemEnvironment['RESTLI_SERVLET_THREADS']}")
private Integer environmentThreads;
+ @Value("${RESTLI_TIMEOUT_SECONDS:60}")
+ private int restliTimeoutSeconds;
+
@Value("${" + INGESTION_MAX_SERIALIZED_STRING_LENGTH + ":16000000}")
private int maxSerializedStringLength;
@@ -71,6 +74,7 @@ public RAPJakartaServlet rapServlet(
RestLiServer restLiServer = new RestLiServer(config, springInjectResourceFactory, parseqEngine);
return new RAPJakartaServlet(
new FilterChainDispatcher(
- new DelegatingTransportDispatcher(restLiServer, restLiServer), FilterChains.empty()));
+ new DelegatingTransportDispatcher(restLiServer, restLiServer), FilterChains.empty()),
+ restliTimeoutSeconds);
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java
index bfc25b7ddaef50..4a38b331d95fed 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java
@@ -8,26 +8,25 @@
import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.context.support.HttpRequestHandlerServlet;
@Slf4j
@AllArgsConstructor
public class RestliHandlerServlet extends HttpRequestHandlerServlet implements HttpRequestHandler {
- @Autowired private RAPJakartaServlet _r2Servlet;
+ private final RAPJakartaServlet r2Servlet;
@Override
public void init(ServletConfig config) throws ServletException {
log.info("Initializing RestliHandlerServlet");
- this._r2Servlet.init(config);
+ this.r2Servlet.init(config);
log.info("Initialized RestliHandlerServlet");
}
@Override
public void service(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
- _r2Servlet.service(req, res);
+ r2Servlet.service(req, res);
}
@Override
diff --git a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java
index 2f66b30f55844c..6e02db6eb65239 100644
--- a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java
+++ b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java
@@ -37,10 +37,12 @@
import org.springframework.web.HttpRequestMethodNotSupportedException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
+@RequestMapping("/api")
public class GraphQLController {
public GraphQLController() {
diff --git a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphiQLController.java b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphiQLController.java
index 61f2720c6cfca4..0461af4db11cc8 100644
--- a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphiQLController.java
+++ b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphiQLController.java
@@ -32,7 +32,7 @@ public GraphiQLController() {
}
}
- @GetMapping(value = "/graphiql", produces = MediaType.TEXT_HTML_VALUE)
+ @GetMapping(value = "/api/graphiql", produces = MediaType.TEXT_HTML_VALUE)
@ResponseBody
CompletableFuture graphiQL() {
return GraphQLConcurrencyUtils.supplyAsync(
diff --git a/metadata-service/openapi-analytics-servlet/src/main/resources/JavaSpring/apiController.mustache b/metadata-service/openapi-analytics-servlet/src/main/resources/JavaSpring/apiController.mustache
index ddf1c130d7be69..fbeb0ba7454a8b 100644
--- a/metadata-service/openapi-analytics-servlet/src/main/resources/JavaSpring/apiController.mustache
+++ b/metadata-service/openapi-analytics-servlet/src/main/resources/JavaSpring/apiController.mustache
@@ -62,7 +62,7 @@ import java.util.concurrent.Callable;
{{/isJava8or11}}
{{>generatedAnnotation}}
@Controller
-@RequestMapping("/v2/analytics")
+@RequestMapping("/openapi/v2/analytics")
{{#operations}}
public class {{classname}}Controller implements {{classname}} {
diff --git a/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache b/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache
index fbf354ff91688f..78bb061b5d50a4 100644
--- a/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache
+++ b/metadata-service/openapi-entity-servlet/src/main/resources/JavaSpring/apiController.mustache
@@ -80,7 +80,7 @@ import java.util.concurrent.Callable;
{{/useOas2}}
{{#operations}}
@RestController
-@RequestMapping("/v2/entity")
+@RequestMapping("/openapi/v2/entity")
public class {{classname}}Controller implements {{classname}} {
private static final Logger log = LoggerFactory.getLogger({{classname}}Controller.class);
diff --git a/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java b/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java
index 31b35b65ea1a8c..fc91b0990a333a 100644
--- a/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java
+++ b/metadata-service/openapi-entity-servlet/src/test/java/io/datahubproject/openapi/v2/delegates/EntityApiDelegateImplTest.java
@@ -301,7 +301,7 @@ public void customModelTest() throws Exception {
mockMvc
.perform(
- MockMvcRequestBuilders.post("/v2/entity/dataset")
+ MockMvcRequestBuilders.post("/openapi/v2/entity/dataset")
.content(body)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java
index 01493d71643481..b78121573811f6 100644
--- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java
+++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java
@@ -18,6 +18,7 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.servlet.NoHandlerFoundException;
import org.springframework.web.servlet.mvc.support.DefaultHandlerExceptionResolver;
@Slf4j
@@ -88,4 +89,13 @@ public ResponseEntity