diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java index afaafa3f8a9..c25f8b0e0b1 100644 --- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java @@ -31,11 +31,13 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import java.io.IOException; import java.net.URI; +import java.util.Map; @Slf4j public class AmazonDynamoDBSourceReader extends AbstractSingleSplitReader { @@ -78,18 +80,25 @@ public void close() throws IOException { @Override @SuppressWarnings("magicnumber") public void pollNext(Collector output) throws Exception { - ScanResponse scan = - dynamoDbClient.scan( - ScanRequest.builder() - .tableName(amazondynamodbSourceOptions.getTable()) - .build()); - if (scan.hasItems()) { - scan.items() - .forEach( - item -> { - output.collect(seaTunnelRowDeserializer.deserialize(item)); - }); - } + Map lastKeyEvaluated = null; + + ScanResponse scan; + do { + scan = + dynamoDbClient.scan( + ScanRequest.builder() + .tableName(amazondynamodbSourceOptions.getTable()) + .exclusiveStartKey(lastKeyEvaluated) + .build()); + if (scan.hasItems()) { + scan.items() + .forEach( + item -> { + output.collect(seaTunnelRowDeserializer.deserialize(item)); + }); + } + lastKeyEvaluated = scan.lastEvaluatedKey(); + } while (lastKeyEvaluated != null && !lastKeyEvaluated.isEmpty()); context.signalNoMoreElement(); } }