Skip to content

Commit

Permalink
[build-fix] lint failure
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Jan 15, 2025
1 parent 16d208a commit 05e6910
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,9 @@ public InsertRowsResponse get() throws Throwable {
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
SinkRecord originalSinkRecord = originalSinkRecords.get(idx);
InsertValidationResponse response = this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset())
);
InsertValidationResponse response =
this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset()));
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
Expand All @@ -686,9 +686,7 @@ public InsertRowsResponse get() throws Throwable {
} else {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
originalSinkRecord,
channel.getTableSchema());
schemaEvolutionTargetItems, originalSinkRecord, channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,9 +723,12 @@ public static List<SinkRecord> createBlankJsonSinkRecords(
final String topicName,
final int partitionNo) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo, null,
Collections.singletonMap("schemas.enable", Boolean.toString(false))
);
startOffset,
noOfRecords,
topicName,
partitionNo,
null,
Collections.singletonMap("schemas.enable", Boolean.toString(false)));
}

/* Generate (noOfRecords - startOffset) for a given topic and partition. */
Expand All @@ -735,10 +738,12 @@ public static List<SinkRecord> createNativeJsonSinkRecords(
final String topicName,
final int partitionNo) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo,
startOffset,
noOfRecords,
topicName,
partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true))
);
Collections.singletonMap("schemas.enable", Boolean.toString(true)));
}

private static List<SinkRecord> createJsonRecords(
Expand All @@ -747,8 +752,7 @@ private static List<SinkRecord> createJsonRecords(
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig
) {
Map<String, String> converterConfig) {
JsonConverter converter = new JsonConverter();
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful;
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
import com.snowflake.kafka.connector.internal.TestUtils;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful;
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryServiceV2;
Expand Down Expand Up @@ -506,7 +507,8 @@ public void testPartialBatchChannelInvalidationIngestion_schematization(boolean
final long secondBatchCount = 500;

// create 18 blank records that do not kick off schematization
List<SinkRecord> firstBatch = TestUtils.createBlankJsonSinkRecords(0, firstBatchCount, topic, PARTITION);
List<SinkRecord> firstBatch =
TestUtils.createBlankJsonSinkRecords(0, firstBatchCount, topic, PARTITION);
service.insert(firstBatch);

// send batch with 500, should kick off a record based flush and schematization on record 19,
Expand Down Expand Up @@ -765,9 +767,7 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl
}

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302,
20, 5
);
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, 20, 5);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
Expand Down

0 comments on commit 05e6910

Please sign in to comment.