From a4edfff94a285ea74232cc8d84ca1a3e16a773d1 Mon Sep 17 00:00:00 2001 From: Nir Tsruya Date: Mon, 22 Feb 2021 10:36:29 +0100 Subject: [PATCH 1/2] sink refactor fix unprocessed items --- .../com/klarna/flink/connectors/dynamodb/DynamoDBProducer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/klarna/flink/connectors/dynamodb/DynamoDBProducer.java b/src/main/java/com/klarna/flink/connectors/dynamodb/DynamoDBProducer.java index f4140a1..ca91c48 100644 --- a/src/main/java/com/klarna/flink/connectors/dynamodb/DynamoDBProducer.java +++ b/src/main/java/com/klarna/flink/connectors/dynamodb/DynamoDBProducer.java @@ -241,7 +241,8 @@ protected Callable batchWrite(final BatchRequest batchRequest) { t = null; try { final BatchWriteItemResponse batchWriteItemResponse = dynamoDbClient.batchWriteItem(batchWriteItemRequest); - if (batchWriteItemResponse.hasUnprocessedItems()) { + Map> unprocessedItems = batchWriteItemResponse.unprocessedItems(); + if (!unprocessedItems.isEmpty()) { retry = true; batchWriteItemRequest = BatchWriteItemRequest.builder() .requestItems(batchWriteItemResponse.unprocessedItems()) From 5b3e6b959362b0f88a64e834733e7016544773e3 Mon Sep 17 00:00:00 2001 From: Nir Tsruya Date: Mon, 22 Feb 2021 10:36:53 +0100 Subject: [PATCH 2/2] sink refactor fix unprocessed items --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b77565b..4de5807 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 jar com.klarna - 1.1.0 + 1.1.1 flink-connector-dynamodb Flink DynamoDB Sink