From 80d260c48b53df85cbf8ec806c4aa1c3575a4a68 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sun, 18 Jun 2023 01:35:48 +0800 Subject: [PATCH] test(connector): add S3SourceConnectorTest to verify 1. add S3SourceConnectorTest to verify Closes https://github.com/apache/eventmesh/issues/4082 --- .../connector/s3/config/ConnectorConfig.java | 2 +- .../s3/connector/S3SourceConnector.java | 2 +- .../s3/connector/S3SourceConnectorTest.java | 104 +++++++++++++++++- 3 files changed, 102 insertions(+), 6 deletions(-) diff --git a/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/config/ConnectorConfig.java b/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/config/ConnectorConfig.java index 9c79ea67e8..732ba69140 100644 --- a/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/config/ConnectorConfig.java +++ b/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/config/ConnectorConfig.java @@ -45,7 +45,7 @@ public class ConnectorConfig { /** * The maximum number of records that should be returned in each batch poll. */ - private Integer batchSize; + private Integer batchSize = 20; /** * The maximum ms to wait for request futures to complete. diff --git a/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnector.java b/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnector.java index b8e9de6c18..3a12b9d634 100644 --- a/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnector.java +++ b/eventmesh-connectors/source-connector-s3/src/main/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnector.java @@ -121,7 +121,7 @@ public List poll() { return Collections.EMPTY_LIST; } long startPosition = this.position; - long endPosition = Math.max(this.fileSize, this.position + this.eachRecordSize * this.connectorConfig.getBatchSize()); + long endPosition = Math.min(this.fileSize, this.position + this.eachRecordSize * this.connectorConfig.getBatchSize()) - 1; GetObjectRequest request = GetObjectRequest.builder().bucket(this.connectorConfig.getBucket()).key(this.connectorConfig.getFileName()) .range("bytes=" + startPosition + "-" + endPosition).build(); ResponseBytes resp; diff --git a/eventmesh-connectors/source-connector-s3/src/test/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnectorTest.java b/eventmesh-connectors/source-connector-s3/src/test/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnectorTest.java index 4f65589b9c..82e2922342 100644 --- a/eventmesh-connectors/source-connector-s3/src/test/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnectorTest.java +++ b/eventmesh-connectors/source-connector-s3/src/test/java/org/apache/eventmesh/source/connector/s3/connector/S3SourceConnectorTest.java @@ -1,17 +1,113 @@ package org.apache.eventmesh.source.connector.s3.connector; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.eventmesh.openconnect.api.data.ConnectRecord; +import org.apache.eventmesh.source.connector.s3.config.ConnectorConfig; +import org.apache.eventmesh.source.connector.s3.config.S3SourceConfig; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; -@Test +@Ignore public class S3SourceConnectorTest { - S3AsyncClient s3AsyncClient; + private static final S3SourceConfig sourceConfig; + + private static final ConnectorConfig connectorConfig; + + private static final Map schema; + + private static final int eachRecordSize; + + static { + sourceConfig = new S3SourceConfig(); + connectorConfig = new ConnectorConfig(); + connectorConfig.setConnectorName("S3SourceConnector"); + connectorConfig.setRegion("ap-southeast-1"); + connectorConfig.setBucket("event-mesh-bucket"); + connectorConfig.setAccessKey("access-key"); + connectorConfig.setSecretKey("secret-key"); + + connectorConfig.setFileName("test-file"); + + schema = new HashMap<>(); + schema.put("id", 4); + schema.put("body", 16); + schema.put("time", 8); + + eachRecordSize = schema.values().stream().reduce((x, y) -> x + y).orElse(0); + connectorConfig.setSchema(schema); + sourceConfig.setConnectorConfig(connectorConfig); + } + + private S3AsyncClient s3Client; @Before - private void setUp() { - s3AsyncClient = + public void setUp() throws Exception { + AwsBasicCredentials basicCredentials = AwsBasicCredentials.create(this.connectorConfig.getAccessKey(), + this.connectorConfig.getSecretKey()); + this.s3Client = S3AsyncClient.builder().credentialsProvider(() -> basicCredentials) + .region(Region.of(this.connectorConfig.getRegion())).build(); + + // write mocked data + this.writeMockedRecords(200); + } + + @After + public void tearDown() throws Exception { + // clear file + this.s3Client.deleteObject(builder -> builder.bucket(this.connectorConfig.getBucket()) + .key(this.connectorConfig.getFileName()).build()).get(); + } + + @Test + public void testS3SourceConnector() throws Exception { + S3SourceConnector s3SourceConnector = new S3SourceConnector(); + s3SourceConnector.init(sourceConfig); + s3SourceConnector.start(); + int expectedId = 0; + while (true) { + List connectRecords = s3SourceConnector.poll(); + if (connectRecords.isEmpty()) { + break; + } + Assert.assertEquals(20, connectRecords.size()); + for (ConnectRecord connectRecord : connectRecords) { + byte[] data = (byte[]) connectRecord.getData(); + Assert.assertEquals(eachRecordSize, data.length); + ByteBuffer byteBuffer = ByteBuffer.wrap(data); + int id = byteBuffer.getInt(); + Assert.assertEquals(expectedId++, id); + } + } + + } + + private void writeMockedRecords(int count) throws Exception { + ByteBuffer bytes = ByteBuffer.allocate(count * eachRecordSize); + ByteBuffer body = ByteBuffer.allocate(16); + body.putLong(13L); + body.putLong(13L); + body.flip(); + for (int i = 0; i < count; i++) { + bytes.putInt(i); + bytes.put(body); + body.flip(); + bytes.putLong(System.currentTimeMillis()); + } + PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(connectorConfig.getBucket()).key(connectorConfig.getFileName()).build(); + AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytes.array()); + this.s3Client.putObject(putObjectRequest, requestBody).get(); } }