From c7bc7ef147fe28f8ec78c182c0334456ab2b644f Mon Sep 17 00:00:00 2001 From: wgzhao Date: Fri, 20 Sep 2024 15:50:36 +0800 Subject: [PATCH] [improve][plugin][s3writer] Improve performance by refactoring code 1. Use Java V2 API 2. Utilize `prefix` to get objects instead of `listObjects` to reduce the number of requests 3. Add a random string before the file suffix to preserve the filename; for example, change the filename 'upload.csv' to 'upload_.csv', the previous file name will be `uplaod.csv_` 4. Optimize code --- docs/assets/jobs/s3writer.json | 86 +++++++------- .../plugin/writer/s3writer/S3Writer.java | 107 +++++++++--------- 2 files changed, 97 insertions(+), 96 deletions(-) diff --git a/docs/assets/jobs/s3writer.json b/docs/assets/jobs/s3writer.json index c135826d5..51b99612d 100644 --- a/docs/assets/jobs/s3writer.json +++ b/docs/assets/jobs/s3writer.json @@ -6,51 +6,49 @@ "channel": 1 } }, - "content": [ - { - "reader": { - "name": "streamreader", - "parameter": { - "column": [ - { - "value": "DataX", - "type": "string" - }, - { - "value": 19890604, - "type": "long" - }, - { - "value": "1989-06-04 11:22:33", - "type": "date" - }, - { - "value": true, - "type": "bool" - }, - { - "value": "test", - "type": "bytes" - } - ], - "sliceRecordCount": 10 - } - }, - "writer": { - "name": "s3writer", - "parameter": { - "endpoint": "https://s3.amazonaws.com", - "accessId": "xxxxxxxxxxxx", - "accessKey": "xxxxxxxxxxxxxxxxxxxxxxx", - "bucket": "test", - "object": "upload.csv", - "region": "ap-northeast-1", - "encoding": "", - "fieldDelimiter": ",", - "writeMode": "truncate" - } + "content": { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "value": "Addax", + "type": "string" + }, + { + "value": 19890604, + "type": "long" + }, + { + "value": "1989-06-04 11:22:33", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + } + ], + "sliceRecordCount": 10 + } + }, + "writer": { + "name": "s3writer", + "parameter": { + "endpoint": "https://s3.amazonaws.com", + "accessId": "xxxxxxxxxxxx", + "accessKey": "xxxxxxxxxxxxxxxxxxxxxxx", + "bucket": "test", + "object": "upload.csv", + "region": "ap-northeast-1", + "encoding": "", + "fieldDelimiter": ",", + "writeMode": "truncate" } } - ] + } } } diff --git a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java index f74ff7c83..1514c8523 100644 --- a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java +++ b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java @@ -22,6 +22,8 @@ import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; @@ -58,6 +60,14 @@ public void init() this.s3Client = S3Util.initS3Client(this.writerSliceConfig); } + @Override + public void destroy() + { + if (this.s3Client != null) { + this.s3Client.close(); + } + } + private void validateParameter() { this.writerSliceConfig.getNecessaryValue(S3Key.REGION, S3WriterErrorCode.REQUIRED_VALUE); @@ -83,7 +93,7 @@ public void prepare() deleteBucketObjects(bucket, object); } else if ("nonConflict".equals(writeMode)) { - LOG.info("Begin check exists objects which starts with [%s] in bucket [{}] or not Bucket [{}] ", bucket, object); + LOG.info("Begin to check for existing objects that starts with [{}] in bucket [{}]", object, bucket); List objs = listObjects(bucket, object); if (!objs.isEmpty()) { LOG.error("There have {} objects starts with {} in bucket {} ", objs.size(), object, bucket); @@ -92,18 +102,6 @@ else if ("nonConflict".equals(writeMode)) { } } - @Override - public void post() - { - - } - - @Override - public void destroy() - { - - } - @Override public List split(int mandatoryNumber) { @@ -111,30 +109,34 @@ public List split(int mandatoryNumber) List writerSplitConfigs = new ArrayList<>(); String object = this.writerSliceConfig.getString(S3Key.OBJECT); String bucket = this.writerSliceConfig.getString(S3Key.BUCKET); - + String objectName = object; + String objectSuffix = null; + // if the object has suffix, it should separate the object name + if (object.contains(".")) { + objectName = object.split("\\.", -1)[0]; + objectSuffix = "." + object.split("\\.", -1)[1]; + } Set allObjects = new HashSet<>(); for (S3Object obj : listObjects(bucket, object)) { allObjects.add(obj.key()); } - - String objectSuffix; + String fullObjectName; for (int i = 0; i < mandatoryNumber; i++) { // handle same object name - Configuration splitedTaskConfig = this.writerSliceConfig.clone(); - - String fullObjectName; - objectSuffix = StringUtils.replace(UUID.randomUUID().toString(), "-", ""); - fullObjectName = String.format("%s_%s", object, objectSuffix); - while (allObjects.contains(fullObjectName)) { - objectSuffix = StringUtils.replace(UUID.randomUUID().toString(), "-", ""); - fullObjectName = String.format("%s_%s", object, objectSuffix); + Configuration splitTaskConfig = this.writerSliceConfig.clone(); + do { + fullObjectName = String.format("%s_%s%s", objectName, + StringUtils.replace(UUID.randomUUID().toString(), "-", ""), + objectSuffix + ); } + while (allObjects.contains(fullObjectName)); allObjects.add(fullObjectName); - splitedTaskConfig.set(S3Key.OBJECT, fullObjectName); - LOG.info(String.format("split write object name:[%s]", fullObjectName)); + splitTaskConfig.set(S3Key.OBJECT, fullObjectName); + LOG.info("split write object name:[{}]", fullObjectName); - writerSplitConfigs.add(splitedTaskConfig); + writerSplitConfigs.add(splitTaskConfig); } LOG.info("end do split."); return writerSplitConfigs; @@ -149,17 +151,26 @@ public List split(int mandatoryNumber) */ private List listObjects(String bucket, String objectName) { - ListObjectsRequest listObjects = ListObjectsRequest + String suffix = null; + if (objectName.contains(".")) { + suffix = "." + objectName.split("\\.", -1)[1]; + objectName = objectName.split("\\.", -1)[0]; + } + ListObjectsV2Request listObjects = ListObjectsV2Request .builder() .bucket(bucket) + .prefix(objectName) .build(); - ListObjectsResponse res = s3Client.listObjects(listObjects); + ListObjectsV2Response res = s3Client.listObjectsV2(listObjects); List objects = res.contents(); List result = new ArrayList<>(); for (S3Object obj : objects) { - if (obj.key().startsWith(objectName)) { + if (suffix == null) { + result.add(obj); + } + else if (obj.key().endsWith(suffix)) { result.add(obj); } } @@ -176,7 +187,7 @@ private void deleteBucketObjects(String bucket, String objectName) { List objects = listObjects(bucket, objectName); ArrayList toDelete = new ArrayList<>(); - if ( !objects.isEmpty()) { + if (!objects.isEmpty()) { for (S3Object obj : objects) { toDelete.add(ObjectIdentifier.builder().key(obj.key()).build()); } @@ -232,12 +243,12 @@ public void startWrite(RecordReceiver lineReceiver) { // 设置每块字符串长度 final int partSize = 1024 * 1024 * 10; - long numberCacual = (this.maxFileSize * 1024 * 1024L) / partSize; - final long maxPartNumber = numberCacual >= 1 ? numberCacual : 1; + long numberCalc = (this.maxFileSize * 1024 * 1024L) / partSize; + final long maxPartNumber = numberCalc >= 1 ? numberCalc : 1; //warn: may be StringBuffer->StringBuilder Record record; - LOG.info(String.format("begin do write, each object maxFileSize: [%s]MB...", maxPartNumber * 10)); + LOG.info("Begin do write, each object's max file size is {}MB...", maxPartNumber * 10); // First create a multipart upload and get the upload id CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucket) @@ -254,12 +265,12 @@ public void startWrite(RecordReceiver lineReceiver) .key(object) .uploadId(uploadId) .partNumber(currPart).build(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream( ); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Charset charset = Charset.forName(encoding); boolean needInit = true; while ((record = lineReceiver.getFromReader()) != null) { try { - if (needInit && ! header.isEmpty()) { + if (needInit && !header.isEmpty()) { // write header outputStream.write(String.join(String.valueOf(fieldDelimiter), header).getBytes(charset)); outputStream.write("\n".getBytes(charset)); @@ -312,10 +323,11 @@ public void startWrite(RecordReceiver lineReceiver) LOG.info("end do write"); } - private String record2String(Record record) { + private String record2String(Record record) + { StringJoiner sj = new StringJoiner(this.fieldDelimiter + ""); int columnNum = record.getColumnNumber(); - for (int i=0; i < columnNum; i++) { + for (int i = 0; i < columnNum; i++) { Column column = record.getColumn(i); if (column == null || column.asString() == null) { sj.add(this.nullFormat); @@ -325,29 +337,20 @@ private String record2String(Record record) { if (type == Column.Type.DATE) { SimpleDateFormat sdf = new SimpleDateFormat(this.dateFormat); sj.add(sdf.format(column.asDate())); - } else { + } + else { sj.add(column.asString()); } } return sj.toString(); } - @Override - public void prepare() - { - - } - - @Override - public void post() - { - - } - @Override public void destroy() { - + if (this.s3Client != null) { + this.s3Client.close(); + } } } }