diff --git a/build.gradle b/build.gradle index fbced335ddc2e..07ca1f09e813c 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ buildscript { ext.hadoop3Version = '3.3.6' ext.kafkaVersion = '5.5.15' ext.hazelcastVersion = '5.3.6' - ext.ebeanVersion = '12.16.1' + ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' ext.openLineageVersion = '1.19.0' ext.logbackClassicJava8 = '1.2.12' @@ -104,8 +104,8 @@ project.ext.spec = [ project.ext.externalDependency = [ 'akkaHttp': 'com.typesafe.akka:akka-http-core_2.12:10.2.10', - 'antlr4Runtime': 'org.antlr:antlr4-runtime:4.7.2', - 'antlr4': 'org.antlr:antlr4:4.7.2', + 'antlr4Runtime': 'org.antlr:antlr4-runtime:4.9.3', + 'antlr4': 'org.antlr:antlr4:4.9.3', 'assertJ': 'org.assertj:assertj-core:3.11.1', 'avro': 'org.apache.avro:avro:1.11.3', 'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3', @@ -129,8 +129,10 @@ project.ext.externalDependency = [ 'dropwizardMetricsCore': 'io.dropwizard.metrics:metrics-core:4.2.3', 'dropwizardMetricsJmx': 'io.dropwizard.metrics:metrics-jmx:4.2.3', 'ebean': 'io.ebean:ebean:' + ebeanVersion, + 'ebeanTest': 'io.ebean:ebean-test:' + ebeanVersion, 'ebeanAgent': 'io.ebean:ebean-agent:' + ebeanVersion, 'ebeanDdl': 'io.ebean:ebean-ddl-generator:' + ebeanVersion, + 'ebeanQueryBean': 'io.ebean:querybean-generator:' + ebeanVersion, 'elasticSearchRest': 'org.opensearch.client:opensearch-rest-high-level-client:' + elasticsearchVersion, 'elasticSearchJava': 'org.opensearch.client:opensearch-java:2.6.0', 'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1', @@ -359,6 +361,9 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) { exclude group: "org.slf4j", module: "slf4j-log4j12" exclude group: "org.slf4j", module: "slf4j-nop" exclude group: "org.slf4j", module: "slf4j-ext" + + resolutionStrategy.force externalDependency.antlr4Runtime + resolutionStrategy.force externalDependency.antlr4 } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java index 4855cef95cb6e..63b319d943a80 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java @@ -77,7 +77,7 @@ public Function executable() { } try { - _server.execute(_server.createSqlUpdate(sqlUpdateStr)); + _server.execute(_server.sqlUpdate(sqlUpdateStr)); } catch (Exception e) { context.report().addLine("Failed to create table metadata_aspect_v2", e); return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED); diff --git a/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh index 41b09e0705b89..d679f270095f1 100755 --- a/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh +++ b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh @@ -42,7 +42,11 @@ for jarFile in ${jarFiles}; do grep -v "MetadataChangeProposal.avsc" |\ grep -v "io.openlineage" |\ grep -v "org.apache" |\ - grep -v "aix" + grep -v "aix" |\ + grep -v "scala" |\ + grep -v "io/micrometer/" |\ + grep -v "library.properties|rootdoc.txt" \| + grep -v "com/ibm/.*" if [ $? -ne 0 ]; then diff --git a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh index fe3dd8d18f699..bd0c28f0f8698 100755 --- a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh @@ -41,7 +41,10 @@ jar -tvf $jarFile |\ grep -v "aix" |\ grep -v "com/sun/" |\ grep -v "VersionInfo.java" |\ - grep -v "mime.types" + grep -v "mime.types" |\ + grep -v "com/ibm/.*" |\ + grep -v "org/glassfish/" |\ + grep -v "LICENSE" if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" diff --git a/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh index 2dd3743ae2ced..81d6a541d1c2a 100755 --- a/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh +++ b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh @@ -39,7 +39,8 @@ jar -tvf $jarFile |\ grep -v "library.properties" |\ grep -v "rootdoc.txt" |\ grep -v "VersionInfo.java" |\ - grep -v "mime.types" + grep -v "mime.types" |\ + grep -v "com/ibm/.*" if [ $? -ne 0 ]; then diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 7e72767c08b79..09a41d100199d 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -1,6 +1,7 @@ plugins { id 'java-library' id 'pegasus' + id 'io.ebean' version "${ebeanVersion}" // Use the latest version from global build.gradle } configurations { @@ -50,8 +51,9 @@ dependencies { runtimeOnly externalDependency.jna api externalDependency.kafkaClients api externalDependency.ebean - enhance externalDependency.ebeanAgent + annotationProcessor externalDependency.ebeanQueryBean implementation externalDependency.ebeanDdl + implementation externalDependency.ebeanAgent implementation externalDependency.opentelemetryAnnotations implementation externalDependency.resilience4j // Newer Spring libraries require JDK17 classes, allow for JDK11 @@ -89,6 +91,7 @@ dependencies { testImplementation externalDependency.lombok testImplementation externalDependency.springBootTest testImplementation spec.product.pegasus.restliServer + testImplementation externalDependency.ebeanTest // logback >=1.3 required due to `testcontainers` only testImplementation 'ch.qos.logback:logback-classic:1.4.7' @@ -139,17 +142,12 @@ test { testLogging.exceptionFormat = 'full' } -tasks.withType(Test) { - enableAssertions = false +ebean { + debugLevel = 1 // 0 - 9 } -project.compileJava { - doLast { - ant.taskdef(name: 'ebean', classname: 'io.ebean.enhance.ant.AntEnhanceTask', - classpath: project.configurations.enhance.asPath) - ant.ebean(classSource: "${project.buildDir}/classes/java/main", packages: 'com.linkedin.metadata.entity.ebean', - transformArgs: 'debug=1') - } +tasks.withType(Test) { + enableAssertions = false } clean { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 69135a8a64805..34c98bba01af4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -85,6 +85,7 @@ import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.extension.annotations.WithSpan; +import jakarta.persistence.EntityNotFoundException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -111,7 +112,6 @@ import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.persistence.EntityNotFoundException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 4304be1aa2a00..6233bf5e0e35c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -41,6 +41,8 @@ import io.ebean.Transaction; import io.ebean.TxScope; import io.ebean.annotation.TxIsolation; +import jakarta.persistence.PersistenceException; +import jakarta.persistence.Table; import java.net.URISyntaxException; import java.sql.Timestamp; import java.time.Clock; @@ -62,8 +64,6 @@ import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.persistence.PersistenceException; -import javax.persistence.Table; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -790,10 +790,43 @@ public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspec return result.isEmpty() ? -1 : result.get(0).getVersion(); } + /** + * This method is only used as a fallback. It does incur an extra read-lock that is naturally a + * result of getLatestAspects(, forUpdate=true) + * + * @param urnAspects urn and aspect names to fetch + * @return map of the aspect's next version + */ public Map> getNextVersions( @Nonnull Map> urnAspects) { validateConnection(); + List forUpdateKeys = new ArrayList<>(); + + // initialize with default next version of 0 + Map> result = + new HashMap<>( + urnAspects.entrySet().stream() + .map( + entry -> { + Map defaultNextVersion = new HashMap<>(); + entry + .getValue() + .forEach( + aspectName -> { + defaultNextVersion.put(aspectName, ASPECT_LATEST_VERSION); + forUpdateKeys.add( + new EbeanAspectV2.PrimaryKey( + entry.getKey(), aspectName, ASPECT_LATEST_VERSION)); + }); + return Map.entry(entry.getKey(), defaultNextVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + + // forUpdate is required to avoid duplicate key violations (it is used as an indication that the + // max(version) was invalidated + _server.find(EbeanAspectV2.class).where().idIn(forUpdateKeys).forUpdate().findList(); + Junction queryJunction = _server .find(EbeanAspectV2.class) @@ -811,21 +844,11 @@ public Map> getNextVersions( } } - Map> result = new HashMap<>(); - // Default next version 0 - urnAspects.forEach( - (key, value) -> { - Map defaultNextVersion = new HashMap<>(); - value.forEach(aspectName -> defaultNextVersion.put(aspectName, 0L)); - result.put(key, defaultNextVersion); - }); - if (exp == null) { return result; } - // forUpdate is required to avoid duplicate key violations - List dbResults = exp.endOr().forUpdate().findIds(); + List dbResults = exp.endOr().findIds(); for (EbeanAspectV2.PrimaryKey key : dbResults) { if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java index 648b7cd6a65b0..844bc7797255c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java @@ -2,29 +2,30 @@ import io.ebean.Model; import io.ebean.annotation.Index; +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Lob; +import jakarta.persistence.Table; +import java.io.Serializable; import java.sql.Timestamp; -import javax.persistence.Column; -import javax.persistence.Embeddable; -import javax.persistence.EmbeddedId; -import javax.persistence.Entity; -import javax.persistence.Lob; -import javax.persistence.Table; +import javax.annotation.Nonnull; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.NonNull; import lombok.Setter; /** Schema definition for the legacy aspect table. */ @Getter @Setter +@NoArgsConstructor +@AllArgsConstructor @Entity @Table(name = "metadata_aspect") public class EbeanAspectV1 extends Model { - private static final long serialVersionUID = 1L; - public static final String ALL_COLUMNS = "*"; public static final String KEY_ID = "key"; public static final String URN_COLUMN = "urn"; @@ -41,16 +42,16 @@ public class EbeanAspectV1 extends Model { @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode - public static class PrimaryKey { + public static class PrimaryKey implements Serializable { private static final long serialVersionUID = 1L; - @NonNull + @Nonnull @Index @Column(name = URN_COLUMN, length = 500, nullable = false) private String urn; - @NonNull + @Nonnull @Index @Column(name = ASPECT_COLUMN, length = 200, nullable = false) private String aspect; @@ -60,18 +61,18 @@ public static class PrimaryKey { private long version; } - @NonNull @EmbeddedId @Index protected PrimaryKey key; + @Nonnull @EmbeddedId @Index protected PrimaryKey key; - @NonNull + @Nonnull @Lob @Column(name = METADATA_COLUMN, nullable = false) protected String metadata; - @NonNull + @Nonnull @Column(name = CREATED_ON_COLUMN, nullable = false) private Timestamp createdOn; - @NonNull + @Nonnull @Column(name = CREATED_BY_COLUMN, nullable = false) private String createdBy; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java index 71e52ed403b9b..407a47a91a454 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java @@ -4,19 +4,19 @@ import com.linkedin.metadata.entity.EntityAspectIdentifier; import io.ebean.Model; import io.ebean.annotation.Index; +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Lob; +import jakarta.persistence.Table; +import java.io.Serializable; import java.sql.Timestamp; import javax.annotation.Nonnull; -import javax.persistence.Column; -import javax.persistence.Embeddable; -import javax.persistence.EmbeddedId; -import javax.persistence.Entity; -import javax.persistence.Lob; -import javax.persistence.Table; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.NonNull; import lombok.Setter; /** Schema definition for the new aspect table. */ @@ -28,8 +28,6 @@ @Table(name = "metadata_aspect_v2") public class EbeanAspectV2 extends Model { - private static final long serialVersionUID = 1L; - public static final String ALL_COLUMNS = "*"; public static final String KEY_ID = "key"; public static final String URN_COLUMN = "urn"; @@ -48,16 +46,16 @@ public class EbeanAspectV2 extends Model { @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode - public static class PrimaryKey { + public static class PrimaryKey implements Serializable { private static final long serialVersionUID = 1L; - @NonNull + @Nonnull @Index @Column(name = URN_COLUMN, length = 500, nullable = false) private String urn; - @NonNull + @Nonnull @Index @Column(name = ASPECT_COLUMN, length = 200, nullable = false) private String aspect; @@ -75,29 +73,29 @@ public EntityAspectIdentifier toAspectIdentifier() { } } - @NonNull @EmbeddedId @Index protected PrimaryKey key; + @Nonnull @EmbeddedId @Index protected PrimaryKey key; - @NonNull + @Nonnull @Column(name = URN_COLUMN, length = 500, nullable = false) private String urn; - @NonNull + @Nonnull @Column(name = ASPECT_COLUMN, length = 200, nullable = false) private String aspect; @Column(name = VERSION_COLUMN, nullable = false) private long version; - @NonNull + @Nonnull @Lob @Column(name = METADATA_COLUMN, nullable = false) protected String metadata; - @NonNull + @Nonnull @Column(name = CREATED_ON_COLUMN, nullable = false) private Timestamp createdOn; - @NonNull + @Nonnull @Column(name = CREATED_BY_COLUMN, nullable = false) private String createdBy; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java new file mode 100644 index 0000000000000..43123fb9872a0 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java @@ -0,0 +1,72 @@ +package com.linkedin.metadata.entity.ebean; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.metadata.EbeanTestUtils; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.config.EbeanConfiguration; +import com.linkedin.metadata.entity.EbeanEntityServiceTest; +import io.ebean.Database; +import io.ebean.test.LoggedSql; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class EbeanAspectDaoTest { + + private EbeanAspectDao testDao; + + @BeforeMethod + public void setupTest() { + Database server = EbeanTestUtils.createTestServer(EbeanEntityServiceTest.class.getSimpleName()); + testDao = new EbeanAspectDao(server, EbeanConfiguration.testDefault); + } + + @Test + public void testGetNextVersionForUpdate() { + LoggedSql.start(); + + testDao.runInTransactionWithRetryUnlocked( + (txContext) -> { + testDao.getNextVersions(Map.of("urn:li:corpuser:test", Set.of("status"))); + return ""; + }, + mock(AspectsBatch.class), + 0); + + // Get the captured SQL statements + List sql = + LoggedSql.stop().stream() + .filter(str -> !str.contains("INFORMATION_SCHEMA.TABLES")) + .toList(); + assertEquals(sql.size(), 2, String.format("Found: %s", sql)); + assertTrue( + sql.get(0).contains("for update;"), String.format("Did not find `for update` in %s ", sql)); + } + + @Test + public void testGetLatestAspectsForUpdate() { + LoggedSql.start(); + + testDao.runInTransactionWithRetryUnlocked( + (txContext) -> { + testDao.getLatestAspects(Map.of("urn:li:corpuser:test", Set.of("status")), true); + return ""; + }, + mock(AspectsBatch.class), + 0); + + // Get the captured SQL statements + List sql = + LoggedSql.stop().stream() + .filter(str -> !str.contains("INFORMATION_SCHEMA.TABLES")) + .toList(); + assertEquals(sql.size(), 1, String.format("Found: %s", sql)); + assertTrue( + sql.get(0).contains("for update;"), String.format("Did not find `for update` in %s ", sql)); + } +}