From a1e993b61fec30a3477a103d381a2783c2b95731 Mon Sep 17 00:00:00 2001 From: Trustpilot Robot User Date: Wed, 12 May 2021 12:30:19 +0100 Subject: [PATCH] Sanitises invalid Avro field names (#9) Avro is [quite restrictive in what characters are allowed when defining a field](https://avro.apache.org/docs/current/spec.html#names) but DynamoDB [would happily allow characters](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html) like `- (dash)` or `. (dot)`. We updated the connectors code so that field names are sanitised so that they match Avro's field naming convention by replacing any invalid characters with `_ (underscore)`. https://trello.com/c/Dhv4FIod/1198-dynamodb-kafka-connector-sanitise-invalid-field-names Co-authored-by: Emilio Larrambebere --- .../dynamodb/utils/RecordConverter.java | 24 ++++- .../dynamodb/utils/RecordConverterTests.java | 87 +++++++++++++++++++ 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 1395c00..abb8039 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -14,8 +14,10 @@ import java.time.Instant; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -62,6 +64,15 @@ public SourceRecord toSourceRecord( String shardId, String sequenceNumber) throws Exception { + // Sanitise the incoming attributes to remove any invalid Avro characters + final Map sanitisedAttributes = attributes.entrySet().stream() + .collect(Collectors.toMap( + e -> this.sanitiseAttributeName(e.getKey()), + Map.Entry::getValue, + (u, v) -> u, + LinkedHashMap::new + )); + // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart Map offsets = SourceInfo.toOffset(sourceInfo); @@ -70,13 +81,13 @@ public SourceRecord toSourceRecord( // DynamoDB keys can be changed only by recreating the table if (keySchema == null) { - keys = tableDesc.getKeySchema().stream().map(KeySchemaElement::getAttributeName).collect(toList()); + keys = tableDesc.getKeySchema().stream().map(this::sanitiseAttributeName).collect(toList()); keySchema = getKeySchema(keys); } Struct keyData = new Struct(getKeySchema(keys)); for (String key : keys) { - AttributeValue attributeValue = attributes.get(key); + AttributeValue attributeValue = sanitisedAttributes.get(key); if (attributeValue.getS() != null) { keyData.put(key, attributeValue.getS()); continue; @@ -89,7 +100,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(attributes)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); @@ -113,4 +124,11 @@ private Schema getKeySchema(List keys) { return keySchemaBuilder.build(); } + private String sanitiseAttributeName(KeySchemaElement element) { + return this.sanitiseAttributeName(element.getAttributeName()); + } + + private String sanitiseAttributeName(final String attributeName) { + return attributeName.replaceAll("^[^a-zA-Z_]|(? getAttributes() { return attributes; } + private Map getAttributesWithInvalidAvroCharacters() { + Map attributes = new HashMap<>(); + attributes.put("test-1234", new AttributeValue().withS("testKV1Value")); + attributes.put("1-starts-with-number", new AttributeValue().withS("2")); + attributes.put("_starts_with_underscore", new AttributeValue().withN("1")); + attributes.put("test!@£$%^", new AttributeValue().withS("testStringValue")); + + return attributes; + } + + + private SourceInfo getSourceInfo(String table) { SourceInfo sourceInfo = new SourceInfo(table, Clock.fixed(Instant.parse("2001-01-02T00:00:00Z"), ZoneId.of("UTC"))); sourceInfo.initSyncStatus = InitSyncStatus.RUNNING; @@ -191,6 +203,81 @@ public void recordAttributesAreAddedToValueData() throws Exception { ((Struct) record.value()).getString("document")); } + @Test + public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception { + // Arrange + List keySchema = new LinkedList<>(); + keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); + + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributesWithInvalidAvroCharacters(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + // Assert + assertEquals("test_1234", record.keySchema().fields().get(0).name()); + assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema()); + assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234")); + } + + @Test + public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception { + // Arrange + List keySchema = new LinkedList<>(); + keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); + keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number")); + + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributesWithInvalidAvroCharacters(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + // Assert + assertEquals("test_1234", record.keySchema().fields().get(0).name()); + assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema()); + assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234")); + + assertEquals("__starts_with_number", record.keySchema().fields().get(1).name()); + assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(1).schema()); + assertEquals("2", ((Struct) record.key()).getString("__starts_with_number")); + } + + @Test + public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception { + // Arrange + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributesWithInvalidAvroCharacters(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + String expected = "{\"test_1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"__starts_with_number\":{\"s\":\"2\"},\"test______\":{\"s\":\"testStringValue\"}}"; + + // Assert + assertEquals(expected, + ((Struct) record.value()).getString("document")); + } + @Test public void sourceInfoIsAddedToValueData() throws Exception { // Arrange