Skip to content

Commit

Permalink
Promoting beta spark lineage plugin
Browse files Browse the repository at this point in the history
Renaming old spark plugin to legacy
Adding kafka emitter
Bumping OpenLineage to 0.17.1
  • Loading branch information
treff7es committed Jul 10, 2024
1 parent 44302a1 commit 5fb85b7
Show file tree
Hide file tree
Showing 120 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.16.0'
ext.openLineageVersion = '1.17.1'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,20 @@ log4j.logger.datahub.client.rest=DEBUG
Use Java 8 to build the project. The project uses Gradle as the build tool. To build the project, run the following command:
```shell
./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:spark-lineage-beta:shadowJar
./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar
```
## Known limitations
+
## Changelog
### Version 0.2.13
- Add kafka emitter to emit lineage to kafka
### Version 0.2.12
- Silencing some chatty warnings in RddPathUtils
### Version 0.2.12
### Version 0.2.11
- Add option to lowercase dataset URNs
- Add option to set platform instance and/or env per platform with `spark.datahub.platform.<platform_name>.env` and `spark.datahub.platform.<platform_name>.platform_instance` config parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,8 @@ private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
} else {
disabledFacets = "";
}
meterRegistry
.config()
.commonTags(
Tags.of(
Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
Tag.of("openlineage.spark.version", sparkVersion),
Tag.of("openlineage.spark.disabled.facets", disabledFacets)));
meterRegistry.config();

((CompositeMeterRegistry) meterRegistry)
.getRegistries()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public List<OpenLineage.OutputDataset> apply(LogicalPlan x) {
OpenLineage.OutputDataset outputDataset;

if (catalogTable.isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
DatasetIdentifier di = PathUtils.fromPath(command.outputPath());
if (SaveMode.Overwrite == command.mode()) {
outputDataset =
outputDataset()
Expand All @@ -51,29 +51,43 @@ public List<OpenLineage.OutputDataset> apply(LogicalPlan x) {
}
return Collections.singletonList(outputDataset);
} else {

if (!context.getSparkSession().isPresent()) {
return Collections.emptyList();
}

if (SaveMode.Overwrite == command.mode()) {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()),
PathUtils.fromCatalogTable(catalogTable.get(), context.getSparkSession().get()),
catalogTable.get().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.CREATE));
} else {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()), catalogTable.get().schema()));
PathUtils.fromCatalogTable(catalogTable.get(), context.getSparkSession().get()),
catalogTable.get().schema()));
}
}
}

@Override
public Optional<String> jobNameSuffix(InsertIntoHadoopFsRelationCommand command) {
if (command.catalogTable().isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
DatasetIdentifier di = PathUtils.fromPath(command.outputPath());
return Optional.of(trimPath(di.getName()));
}

if (!context.getSparkSession().isPresent()) {
return Optional.empty();
}

return Optional.of(
trimPath(PathUtils.fromCatalogTable(command.catalogTable().get()).getName()));
trimPath(
PathUtils.fromCatalogTable(
command.catalogTable().get(), context.getSparkSession().get())
.getName()));
}
}
2 changes: 0 additions & 2 deletions metadata-integration/java/spark-lineage/bin/.gitignore

This file was deleted.

This file was deleted.

4 changes: 2 additions & 2 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ include 'metadata-perf'
include 'docs-website'
include 'metadata-models-custom'
include 'entity-registry:custom-test-model'
include 'metadata-integration:java:spark-lineage'
include 'metadata-integration:java:spark-lineage-legacy'
include 'metadata-integration:java:datahub-client'
include 'metadata-integration:java:custom-plugin-lib'
include 'metadata-integration:java:datahub-event'
include 'metadata-integration:java:datahub-protobuf'
include 'metadata-integration:java:openlineage-converter'
include 'metadata-integration:java:spark-lineage-beta'
include 'metadata-integration:java:acryl-spark-lineage'
include 'ingestion-scheduler'
include 'metadata-ingestion-modules:airflow-plugin'
include 'metadata-ingestion-modules:dagster-plugin'
Expand Down

0 comments on commit 5fb85b7

Please sign in to comment.