Skip to content

Commit

Permalink
test(connector): add S3SourceConnectorTest to verify
Browse files Browse the repository at this point in the history
1. add S3SourceConnectorTest to verify

Closes apache#4082
  • Loading branch information
TheR1sing3un committed Aug 14, 2023
1 parent 0dc5c05 commit 80d260c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ConnectorConfig {
/**
* The maximum number of records that should be returned in each batch poll.
*/
private Integer batchSize;
private Integer batchSize = 20;

/**
* The maximum ms to wait for request futures to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public List<ConnectRecord> poll() {
return Collections.EMPTY_LIST;
}
long startPosition = this.position;
long endPosition = Math.max(this.fileSize, this.position + this.eachRecordSize * this.connectorConfig.getBatchSize());
long endPosition = Math.min(this.fileSize, this.position + this.eachRecordSize * this.connectorConfig.getBatchSize()) - 1;
GetObjectRequest request = GetObjectRequest.builder().bucket(this.connectorConfig.getBucket()).key(this.connectorConfig.getFileName())
.range("bytes=" + startPosition + "-" + endPosition).build();
ResponseBytes<GetObjectResponse> resp;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,113 @@
package org.apache.eventmesh.source.connector.s3.connector;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
import org.apache.eventmesh.source.connector.s3.config.ConnectorConfig;
import org.apache.eventmesh.source.connector.s3.config.S3SourceConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

@Test
@Ignore
public class S3SourceConnectorTest {

S3AsyncClient s3AsyncClient;
private static final S3SourceConfig sourceConfig;

private static final ConnectorConfig connectorConfig;

private static final Map<String, Integer> schema;

private static final int eachRecordSize;

static {
sourceConfig = new S3SourceConfig();
connectorConfig = new ConnectorConfig();
connectorConfig.setConnectorName("S3SourceConnector");
connectorConfig.setRegion("ap-southeast-1");
connectorConfig.setBucket("event-mesh-bucket");
connectorConfig.setAccessKey("access-key");
connectorConfig.setSecretKey("secret-key");

connectorConfig.setFileName("test-file");

schema = new HashMap<>();
schema.put("id", 4);
schema.put("body", 16);
schema.put("time", 8);

eachRecordSize = schema.values().stream().reduce((x, y) -> x + y).orElse(0);
connectorConfig.setSchema(schema);
sourceConfig.setConnectorConfig(connectorConfig);
}

private S3AsyncClient s3Client;

@Before
private void setUp() {
s3AsyncClient =
public void setUp() throws Exception {
AwsBasicCredentials basicCredentials = AwsBasicCredentials.create(this.connectorConfig.getAccessKey(),
this.connectorConfig.getSecretKey());
this.s3Client = S3AsyncClient.builder().credentialsProvider(() -> basicCredentials)
.region(Region.of(this.connectorConfig.getRegion())).build();

// write mocked data
this.writeMockedRecords(200);
}

@After
public void tearDown() throws Exception {
// clear file
this.s3Client.deleteObject(builder -> builder.bucket(this.connectorConfig.getBucket())
.key(this.connectorConfig.getFileName()).build()).get();
}

@Test
public void testS3SourceConnector() throws Exception {
S3SourceConnector s3SourceConnector = new S3SourceConnector();
s3SourceConnector.init(sourceConfig);
s3SourceConnector.start();
int expectedId = 0;
while (true) {
List<ConnectRecord> connectRecords = s3SourceConnector.poll();
if (connectRecords.isEmpty()) {
break;
}
Assert.assertEquals(20, connectRecords.size());
for (ConnectRecord connectRecord : connectRecords) {
byte[] data = (byte[]) connectRecord.getData();
Assert.assertEquals(eachRecordSize, data.length);
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
int id = byteBuffer.getInt();
Assert.assertEquals(expectedId++, id);
}
}

}

private void writeMockedRecords(int count) throws Exception {
ByteBuffer bytes = ByteBuffer.allocate(count * eachRecordSize);
ByteBuffer body = ByteBuffer.allocate(16);
body.putLong(13L);
body.putLong(13L);
body.flip();
for (int i = 0; i < count; i++) {
bytes.putInt(i);
bytes.put(body);
body.flip();
bytes.putLong(System.currentTimeMillis());
}
PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(connectorConfig.getBucket()).key(connectorConfig.getFileName()).build();
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytes.array());
this.s3Client.putObject(putObjectRequest, requestBody).get();
}

}

0 comments on commit 80d260c

Please sign in to comment.