Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(timeline): fixes primary key change events #11819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ private static List<ChangeEvent> getPrimaryKeyChangeEvents(
SchemaMetadata targetSchema,
Urn datasetUrn,
AuditStamp auditStamp) {
List<ChangeEvent> primaryKeyChangeEvents = new ArrayList<>();
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
List<ChangeEvent> primaryKeyChangeEvents = new ArrayList<>();
Set<String> basePrimaryKeys =
(baseSchema != null && baseSchema.getPrimaryKeys() != null)
? new HashSet<>(baseSchema.getPrimaryKeys())
Expand All @@ -529,51 +529,53 @@ private static List<ChangeEvent> getPrimaryKeyChangeEvents(
basePrimaryKeys.stream()
.filter(key -> !targetPrimaryKeys.contains(key))
.collect(Collectors.toSet());
for (String removedBaseKeyField : removedBaseKeys) {
Urn schemaFieldUrn = getSchemaFieldUrn(datasetUrn.toString(), removedBaseKeyField);
primaryKeyChangeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.modifier(schemaFieldUrn.toString())
.fieldUrn(schemaFieldUrn)
.fieldPath(removedBaseKeyField)
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(
BACKWARDS_INCOMPATIBLE_DESC
+ " removal of the primary key field '"
+ removedBaseKeyField
+ "'")
.auditStamp(auditStamp)
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.build());
}

Set<String> addedTargetKeys =
targetPrimaryKeys.stream()
.filter(key -> !basePrimaryKeys.contains(key))
.collect(Collectors.toSet());
if (!removedBaseKeys.isEmpty() || !addedTargetKeys.isEmpty()) {
String keyChangeTarget;
// Just pick the first schema field we can find for the change event
if (!removedBaseKeys.isEmpty()) {
keyChangeTarget = removedBaseKeys.stream().findFirst().get();
} else {
keyChangeTarget = addedTargetKeys.stream().findFirst().get();
}

StringBuilder description =
new StringBuilder(BACKWARDS_INCOMPATIBLE_DESC + " a primary key constraint change.");
if (!removedBaseKeys.isEmpty()) {
description.append(" The following fields were removed:");
removedBaseKeys.forEach(
removedBaseKey -> description.append(" '").append(removedBaseKey).append("'"));
description.append(".");
}
if (!addedTargetKeys.isEmpty()) {
description.append(" The following fields were added:");
addedTargetKeys.forEach(
addedTargetKey -> description.append(" '").append(addedTargetKey).append("'"));
description.append(".");
}
for (String addedTargetKeyField : addedTargetKeys) {
Urn schemaFieldUrn = getSchemaFieldUrn(datasetUrn.toString(), addedTargetKeyField);
primaryKeyChangeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.fieldUrn(getSchemaFieldUrn(datasetUrn, keyChangeTarget))
.fieldPath(keyChangeTarget)
.modifier(getSchemaFieldUrn(datasetUrn, keyChangeTarget).toString())
.modifier(getSchemaFieldUrn(datasetUrn, addedTargetKeyField).toString())
.fieldUrn(schemaFieldUrn)
.fieldPath(addedTargetKeyField)
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(description.toString())
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.description(
BACKWARDS_INCOMPATIBLE_DESC
+ " addition of the primary key field '"
+ addedTargetKeyField
+ "'")
.auditStamp(auditStamp)
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.build());
return primaryKeyChangeEvents;
}
}
return Collections.emptyList();
return primaryKeyChangeEvents;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ public void testSchemaFieldPrimaryKeyChange() throws Exception {
List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp);
compareDescriptions(
Set.of(
"A backwards incompatible change due to a primary key constraint change. "
+ "The following fields were removed: 'ID'. The following fields were added: 'ID2'."),
"A backwards incompatible change due to addition of the primary key field 'ID2'",
"A backwards incompatible change due to removal of the primary key field 'ID'"),
actual);
assertEquals(1, actual.size());
assertEquals(actual.size(), 2);
compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual);
}

Expand Down Expand Up @@ -274,10 +274,10 @@ public void testSchemaFieldPrimaryKeyChangeRenameAdd() throws Exception {
List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp);
compareDescriptions(
Set.of(
"A backwards incompatible change due to a primary key constraint change. "
+ "The following fields were removed: 'ID'. The following fields were added: 'ID2'."),
"A backwards incompatible change due to addition of the primary key field 'ID2'",
"A backwards incompatible change due to removal of the primary key field 'ID'"),
actual);
assertEquals(1, actual.size());
assertEquals(actual.size(), 2);
compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual);

Aspect<SchemaMetadata> to4 =
Expand Down
Loading