Skip to content

Commit

Permalink
update pom
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Feb 27, 2024
1 parent 1c7d9b5 commit ac1a96f
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 31 deletions.
2 changes: 1 addition & 1 deletion eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
## Cosmos Kafka connector under sdk\cosmos\azure-cosmos-kafka-connect\pom.xml
# Cosmos Kafka connector runtime dependencies
cosmos_org.apache.kafka:connect-api;3.6.0
cosmos_org.com.jayway.jsonpath:json-path;2.9.0
cosmos_com.jayway.jsonpath:json-path;2.9.0
# Cosmos Kafka connector tests only
cosmos_org.apache.kafka:connect-runtime;3.6.0
# Maven Tools for Cosmos Kafka connector only
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Licensed under the MIT License.
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId> <!-- {x-version-update;cosmos_org.com.jayway.jsonpath:json-path;external_dependency} -->
<artifactId>json-path</artifactId> <!-- {x-version-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
<version>2.9.0</version>
</dependency>

Expand Down Expand Up @@ -234,6 +234,7 @@ Licensed under the MIT License.
<include>com.azure:*</include>
<include>org.apache.kafka:connect-api:[3.6.0]</include> <!-- {x-include-update;cosmos_org.apache.kafka:connect-api;external_dependency} -->
<include>io.confluent:kafka-connect-maven-plugin:[0.12.0]</include> <!-- {x-include-update;cosmos_io.confluent:kafka-connect-maven-plugin;external_dependency} -->
<include>com.jayway.jsonpath:json-path:[2.9.0]</include> <!-- {x-version-update;cosmos_org.com.jayway.jsonpath:json-path;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,19 @@ public void write(CosmosAsyncContainer container, List<SinkRecord> sinkRecords)
}
}

@SuppressWarnings("unchecked")
protected String getId(Object recordValue) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");
return ((Map<String, Object>) recordValue).get(ID).toString();
}

@SuppressWarnings("unchecked")
protected String getEtag(Object recordValue) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");
return ((Map<String, Object>) recordValue).getOrDefault(ETAG, Strings.Emtpy).toString();
}

@SuppressWarnings("unchecked")
protected PartitionKey getPartitionKeyValue(Object recordValue, PartitionKeyDefinition partitionKeyDefinition) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) {
this.idStrategy = this.createIdStrategy(sinkTaskConfig);
}

@SuppressWarnings("unchecked")
public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRecords) {
List<SinkRecord> toBeWrittenRecordList = new ArrayList<>();
for (SinkRecord record : sinkRecords) {
Expand Down Expand Up @@ -72,6 +73,7 @@ public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRec
return toBeWrittenRecordList;
}

@SuppressWarnings("unchecked")
private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) {
if (!(recordValue instanceof Map)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static Map<String, Object> toJsonMap(Struct struct) {
return jsonMap;
}

@SuppressWarnings("unchecked")
public static Map<String, Object> handleMap(Map<String, Object> map) {
if (map == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand Down Expand Up @@ -46,47 +47,47 @@ private void returnOnKeyOrValue(

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", expectedExceptions = ConnectException.class, timeOut = TIMEOUT)
public void valueNotStructOrMapShouldFail(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(Schema.STRING_SCHEMA, "a string", idStrategy, sinkRecord);
idStrategy.generateId(sinkRecord);
}

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", expectedExceptions = ConnectException.class, timeOut = TIMEOUT)
public void noIdInValueShouldFail(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(null, Map.of(), idStrategy, sinkRecord);
returnOnKeyOrValue(null, new HashMap<>(), idStrategy, sinkRecord);
idStrategy.generateId(sinkRecord);
}

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void stringIdOnMapShouldReturn(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", "1234567"),
new HashMap<String, Object>(){{ put("id", "1234567"); }},
idStrategy,
sinkRecord);
assertThat("1234567").isEqualTo(idStrategy.generateId(sinkRecord));
}

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void nonStringIdOnMapShouldReturn(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", 1234567),
new HashMap<String, Object>(){{ put("id", 1234567); }},
idStrategy,
sinkRecord);
assertThat("1234567").isEqualTo(idStrategy.generateId(sinkRecord));
}

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void stringIdOnStructShouldReturn(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
Schema schema = SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.build();
Expand All @@ -100,7 +101,7 @@ public void stringIdOnStructShouldReturn(IdStrategy idStrategy) {

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void structIdOnStructShouldReturn(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
Schema idSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.build();
Expand All @@ -117,8 +118,7 @@ public void structIdOnStructShouldReturn(IdStrategy idStrategy) {

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void jsonPathOnStruct(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"); }});

Schema idSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
Expand All @@ -136,26 +136,24 @@ public void jsonPathOnStruct(IdStrategy idStrategy) {

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void jsonPathOnMap(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"); }});

SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", Map.of("name", "franz kafka")),
new HashMap<String, Object>(){{ put("id", new HashMap<String, Object>(){{ put("name", "franz kafka"); }}); }},
idStrategy,
sinkRecord);
assertThat("franz kafka").isEqualTo(idStrategy.generateId(sinkRecord));
}

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", expectedExceptions = ConnectException.class, timeOut = TIMEOUT)
public void invalidJsonPathThrows(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "invalid.path"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "invalid.path"); }});
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", Map.of("name", "franz kafka")),
new HashMap<String, Object>(){{ put("id", new HashMap<String, Object>(){{ put("name", "franz kafka"); }}); }},
idStrategy,
sinkRecord);

Expand All @@ -164,12 +162,11 @@ public void invalidJsonPathThrows(IdStrategy idStrategy) {

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", expectedExceptions = ConnectException.class, timeOut = TIMEOUT)
public void jsonPathNotExistThrows(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.not.exist"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.not.exist"); }});
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", Map.of("name", "franz kafka")),
new HashMap<String, Object>(){{ put("id", new HashMap<String, Object>(){{ put("name", "franz kafka"); }}); }},
idStrategy,
sinkRecord);

Expand All @@ -178,7 +175,6 @@ public void jsonPathNotExistThrows(IdStrategy idStrategy) {

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void complexJsonPath(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
Map<String, Object> map1 = new LinkedHashMap<>();
map1.put("id", 0);
map1.put("name", "cosmos kramer");
Expand All @@ -191,30 +187,30 @@ public void complexJsonPath(IdStrategy idStrategy) {
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", List.of(map1, map2)),
new HashMap<String, Object>() {{ put("id", Arrays.asList(map1, map2)); }},
idStrategy,
sinkRecord);

idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name"); }});
assertThat("cosmos kramer").isEqualTo(idStrategy.generateId(sinkRecord));

idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[1].name"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[1].name"); }});
assertThat("franz kafka").isEqualTo(idStrategy.generateId(sinkRecord));

idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[*].id"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[*].id"); }});
assertThat("[0,1]").isEqualTo(idStrategy.generateId(sinkRecord));

idStrategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id"));
idStrategy.configure(new HashMap<String, Object>(){{ put(ProvidedInConfig.JSON_PATH_CONFIG, "$.id"); }});
assertThat("[{\"id\":0,\"name\":\"cosmos kramer\",\"occupation\":\"unknown\"},{\"id\":1,\"name\":\"franz kafka\",\"occupation\":\"writer\"}]").isEqualTo(idStrategy.generateId(sinkRecord));
}

@Test(groups = { "unit" }, dataProvider = "idStrategyParameterProvider", timeOut = TIMEOUT)
public void generatedIdSanitized(IdStrategy idStrategy) {
idStrategy.configure(Map.of());
idStrategy.configure(new HashMap<>());
SinkRecord sinkRecord = Mockito.mock(SinkRecord.class);
returnOnKeyOrValue(
null,
Map.of("id", "#my/special\\id?"),
new HashMap<String, Object>() {{put("id", "#my/special\\id?");}},
idStrategy,
sinkRecord);

Expand Down

0 comments on commit ac1a96f

Please sign in to comment.