Skip to content

Commit

Permalink
Fix reading ByteBuffer from Avro, inserting base64 encoded strings in…
Browse files Browse the repository at this point in the history
…to postgres BYTEA columns and cast UUID arrays. (#2174)

Co-authored-by: Claude <cvandermerwe@google.com>
  • Loading branch information
claudevdm and Claude authored Feb 13, 2025
1 parent 30020c0 commit 1a176fa
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public FailsafeElement<String, String> apply(GenericRecord record) {

// All Raw Metadata
outputObject.put("_metadata_source", getSourceMetadataJson(record));

return FailsafeElement.of(outputObject.toString(), outputObject.toString());
}

Expand Down Expand Up @@ -386,7 +385,22 @@ static void putField(
jsonObject.put(fieldName, (Boolean) record.get(fieldName));
break;
case BYTES:
jsonObject.put(fieldName, (byte[]) record.get(fieldName));
if (record.get(fieldName) instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) record.get(fieldName);
byte[] byteArray = new byte[byteBuffer.remaining()];
byteBuffer.get(byteArray);
jsonObject.put(fieldName, byteArray);
} else if (record.get(fieldName) instanceof byte[]) {
jsonObject.put(fieldName, (byte[]) record.get(fieldName));
} else {
// Handle other types appropriately, possibly throwing an exception
// if the type is unexpected. Or log it.
throw new IllegalArgumentException(
"Unexpected type for field "
+ fieldName
+ ": "
+ record.get(fieldName).getClass().getName());
}
break;
case FLOAT:
String value = record.get(fieldName).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class FormatDatastreamRecordToJsonTest {
+ "\"created_at\":\"2020-02-12T00:00:00Z\","
+ "\"datetime_at\":\"2020-02-12T00:00:00Z\","
+ "\"_0col\":1,"
+ "\"timestamp_with_tz\":\"2022-10-13T14:30:00Z\","
+ "\"timestamp_with_tz\":\"2020-10-13T14:30:00.056483Z\","
+ "\"_metadata_stream\":"
+ "\"projects/269744978479/locations/us-central1/streams/datastream-test-fbefaf33\","
+ "\"_metadata_timestamp\":1623459160,"
Expand Down Expand Up @@ -140,6 +140,7 @@ record = dataFileReader.next();
}
}

@Test
public void testParseMySQLPeoplePrimaryKeys() throws IOException, URISyntaxException {
URL resource =
getClass()
Expand All @@ -163,6 +164,7 @@ record = dataFileReader.next();
}
}

@Test
public void testParseMySQLNumbers() throws IOException, URISyntaxException {
URL resource =
getClass()
Expand All @@ -182,6 +184,22 @@ public void testParseMySQLNumbers() throws IOException, URISyntaxException {
assertEquals(EXPECTED_NUMERIC_RECORD, changeEvent.toString());
}

@Test
public void testPostgresByteArray() throws IOException, URISyntaxException {
URL resource =
getClass().getClassLoader().getResource("FormatDatastreamRecordToJsonTest/bytearray.avro");
File file = new File(resource.toURI());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader);
GenericRecord record = dataFileReader.next();
String jsonData = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload();
ObjectMapper mapper = new ObjectMapper();
JsonNode changeEvent = mapper.readTree(jsonData);
// The avro file contains binary_content: b'\xde\xad\xbe\xef', which is converted to
// base64 encoded string by Jackson library.
assertEquals("3q2+7w==", changeEvent.get("binary_content").textValue());
}

@Test
public void testHashRowId_valid() {
assertEquals(0L, FormatDatastreamRecord.hashRowIdToInt("AAAAAAAA++++++++++"));
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public DatastreamToDML withSchemaMap(Map<String, String> schemaMap) {
public void processElement(ProcessContext context) {
FailsafeElement<String, String> element = context.element();
String jsonString = element.getPayload();

ObjectMapper mapper = new ObjectMapper();
JsonNode rowObj;

Expand Down Expand Up @@ -284,7 +283,6 @@ public Map<String, String> getSqlTemplateValues(

public String getValueSql(JsonNode rowObj, String columnName, Map<String, String> tableSchema) {
String columnValue;

JsonNode columnObj = rowObj.get(columnName);
if (columnObj == null) {
LOG.warn("Missing Required Value: {} in {}", columnName, rowObj.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public String cleanDataTypeValueSql(
break;
case "INTERVAL":
return convertJsonToPostgresInterval(columnValue, columnName);
case "BYTEA":
// Byte arrays are converted to base64 string representation.
return "decode(" + columnValue + ",'base64')";
}

// Arrays in Postgres are prefixed with underscore e.g. _INT4 for integer array.
Expand Down Expand Up @@ -194,6 +197,10 @@ private String convertJsonToPostgresArray(String jsonValue, String dataType, Str
// Cast string array to jsonb array.
return arrayStatement + "::jsonb[]";
}
if (dataType.equals("_UUID")) {
// Cast string array to uuid array.
return arrayStatement + "::uuid[]";
}
return arrayStatement;

} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,54 @@ public void testJsonbArray() {
assertEquals(expected, actual);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts a
* JSONB array into the correct PostgreSQL array syntax with type casting.
*/
@Test
public void testUuidArray() {
String arrayJson =
"{\"uuid_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": \"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a13\"},"
+ " {\"nestedArray\": null, \"elementValue\": \"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a14\"}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("uuid_array", "_UUID");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expected =
"ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a13','a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a14']::uuid[]";

String actual = dml.getValueSql(rowObj, "uuid_array", tableSchema);

assertEquals(expected, actual);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts a byte
* array into the correct PostgreSQL array syntax with type casting.
*/
@Test
public void testByteArray() {
// Byte arrays are converted to base64 encoded strings by Jackson ObjectNode.toString() in
// FormatDataStreamRecordToJson.
String arrayJson = "{\"binary_content\": \"3q2+7w==\"}";
JsonNode rowObj = getRowObj(arrayJson);

Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("binary_content", "BYTEA");

DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);

// getValueSql converts byte array to base64 encoded string
String expected = "decode('3q2+7w==','base64')";

String actual = dml.getValueSql(rowObj, "binary_content", tableSchema);

assertEquals(expected, actual);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts a JSON
* array into the correct PostgreSQL array syntax with type casting.
Expand Down

0 comments on commit 1a176fa

Please sign in to comment.