Skip to content

Commit

Permalink
Merge pull request #96 from Opetushallitus/OK-613__lampi-siirto-v2_SDK2
Browse files Browse the repository at this point in the history
Vaihdettu käyttöön AWS SDK V2
  • Loading branch information
augustk authored Jan 16, 2025
2 parents 604e37a + 0e777ec commit a8b1734
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 57 deletions.
7 changes: 4 additions & 3 deletions lampi-siirtaja-container/lampi-siirtaja/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.780</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.30.0</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package fi.oph.opintopolku.ovara;

import com.amazonaws.regions.Regions;
import fi.oph.opintopolku.ovara.config.Config;
import fi.oph.opintopolku.ovara.service.LampiSiirtajaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

public class App {

Expand All @@ -22,7 +22,7 @@ public static void main(String[] args) throws Exception {
System.getenv("DB_PASSWORD"),
System.getenv("OVARA_LAMPI_SIIRTAJA_BUCKET"),
System.getenv("LAMPI_S3_BUCKET"),
Regions.EU_WEST_1.getName(),
Region.EU_WEST_1,
"fulldump/ovara/v1/");

LampiSiirtajaService service = new LampiSiirtajaService(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package fi.oph.opintopolku.ovara.config;

import software.amazon.awssdk.regions.Region;

public record Config(
String postgresHost,
Integer postgresPort,
String postgresUser,
String postgresPassword,
String ovaraS3Bucket,
String lampiS3Bucket,
String awsRegion,
Region awsRegion,
String lampiKeyPrefix) {}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private S3ExportResult exportTableToS3(String schemaName, String tableName) thro
String.format("select * from %s.%s", schemaName, tableName),
config.ovaraS3Bucket(),
String.format("%s.csv", tableName),
config.awsRegion());
config.awsRegion().id());

LOG.info(
"Scheman {} taulun {} vienti Ovaran S3-ämpäriin valmistui. Tulokset: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package fi.oph.opintopolku.ovara.io;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Enumeration;
import java.util.Objects;
import java.util.function.Supplier;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

public class MultiInputStream extends InputStream {
private final Enumeration<Supplier<S3ObjectInputStream>> e;
private final Enumeration<Supplier<ResponseInputStream<GetObjectResponse>>> e;
private InputStream in;

public MultiInputStream(Enumeration<Supplier<S3ObjectInputStream>> e) {
public MultiInputStream(Enumeration<Supplier<ResponseInputStream<GetObjectResponse>>> e) {
this.e = e;
peekNextStream();
}
Expand All @@ -26,7 +27,7 @@ final void nextStream() throws IOException {

private void peekNextStream() {
if (e.hasMoreElements()) {
Supplier<S3ObjectInputStream> s = e.nextElement();
Supplier<ResponseInputStream<GetObjectResponse>> s = e.nextElement();
in = s.get();
if (in == null) throw new NullPointerException();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package fi.oph.opintopolku.ovara.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*;
import com.google.common.collect.Iterators;
import com.google.gson.Gson;
import fi.oph.opintopolku.ovara.config.Config;
Expand All @@ -19,6 +16,11 @@
import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

public class LampiS3Transfer {

Expand All @@ -28,23 +30,35 @@ public class LampiS3Transfer {
private static final String MANIFEST_FILENAME = "manifest.json";

private final Config config;
private final AmazonS3 ovaraS3Client;
private final AmazonS3 lampiS3Client;
private final S3Client ovaraS3Client;
private final S3Client lampiS3Client;
private final AtomicInteger uploadPartId = new AtomicInteger(0);
private final Gson gson;

private String uploadId;

public LampiS3Transfer(Config config) {
this.config = config;
this.ovaraS3Client = AmazonS3ClientBuilder.standard().build();
this.lampiS3Client = AmazonS3ClientBuilder.standard().build();
this.gson = new Gson();
this.ovaraS3Client =
S3Client.builder()
.region(config.awsRegion())
.credentialsProvider(ContainerCredentialsProvider.create())
.build();
this.lampiS3Client =
S3Client.builder()
.region(config.awsRegion())
.credentialsProvider(ContainerCredentialsProvider.create())
.build();
}

private Supplier<S3ObjectInputStream> constructSupplier(String downloadFilename) {
return () ->
ovaraS3Client.getObject(config.ovaraS3Bucket(), downloadFilename).getObjectContent();
private Supplier<ResponseInputStream<GetObjectResponse>> constructSupplier(
String downloadFilename) {
return () -> {
GetObjectRequest getObjectRequest =
GetObjectRequest.builder().bucket(config.ovaraS3Bucket()).key(downloadFilename).build();
return ovaraS3Client.getObject(getObjectRequest);
};
}

public void startGZIPCompressing(OutputStream out, InputStream in) {
Expand All @@ -65,32 +79,33 @@ public void startGZIPCompressing(OutputStream out, InputStream in) {
}
}

private PartETag submitTaskForUploading(
private CompletedPart submitTaskForUploading(
String uploadFilename, ByteArrayInputStream inputStream, boolean isFinalPart) {
int eachPartId = uploadPartId.incrementAndGet();
UploadPartRequest uploadRequest =
new UploadPartRequest()
.withBucketName(config.lampiS3Bucket())
.withKey(uploadFilename)
.withUploadId(uploadId)
.withPartNumber(eachPartId)
.withPartSize(inputStream.available())
.withInputStream(inputStream);

if (isFinalPart) {
uploadRequest.withLastPart(true);
}
UploadPartRequest.builder()
.bucket(config.lampiS3Bucket())
.key(uploadFilename)
.uploadId(uploadId)
.partNumber(eachPartId)
.build();

RequestBody requestBody = RequestBody.fromInputStream(inputStream, inputStream.available());

LOG.info(
"Lähetetään tiedoston {} palanen {} jonka koko on {}",
uploadFilename,
eachPartId,
inputStream.available());

UploadPartResult uploadResult = lampiS3Client.uploadPart(uploadRequest);
UploadPartResponse uploadPartResponse = lampiS3Client.uploadPart(uploadRequest, requestBody);

LOG.info("Lähetetty tiedoston {} palanen {}", uploadFilename, eachPartId);
return uploadResult.getPartETag();

CompletedPart completedPart =
CompletedPart.builder().partNumber(eachPartId).eTag(uploadPartResponse.eTag()).build();

return completedPart;
}

public String transferToLampi(String filename, String uploadFilename, int numberOfFiles)
Expand All @@ -101,17 +116,20 @@ public String transferToLampi(String filename, String uploadFilename, int number
filename,
numberOfFiles);

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType("text/csv");
metadata.setContentEncoding("gzip");
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(config.lampiS3Bucket(), uploadFilename)
.withObjectMetadata(metadata);
InitiateMultipartUploadResult initResult = lampiS3Client.initiateMultipartUpload(initRequest);
CreateMultipartUploadRequest createRequest =
CreateMultipartUploadRequest.builder()
.bucket(config.lampiS3Bucket())
.key(uploadFilename)
.contentType("text/csv")
.contentEncoding("gzip")
.build();

CreateMultipartUploadResponse createResponse =
lampiS3Client.createMultipartUpload(createRequest);

uploadId = initResult.getUploadId();
uploadId = createResponse.uploadId();

List<Supplier<S3ObjectInputStream>> streamsFList =
List<Supplier<ResponseInputStream<GetObjectResponse>>> streamsFList =
IntStream.rangeClosed(1, numberOfFiles)
.mapToObj(
fileNumber -> {
Expand All @@ -121,7 +139,7 @@ public String transferToLampi(String filename, String uploadFilename, int number
})
.toList();

Enumeration<Supplier<S3ObjectInputStream>> streams =
Enumeration<Supplier<ResponseInputStream<GetObjectResponse>>> streams =
Iterators.asEnumeration(streamsFList.iterator());

InputStream multiInputStream = new MultiInputStream(streams);
Expand All @@ -136,7 +154,7 @@ public String transferToLampi(String filename, String uploadFilename, int number
int bytesRead, bytesAdded = 0;
byte[] data = new byte[UPLOAD_PART_SIZE];
ByteArrayOutputStream bufferOutputStream = new ByteArrayOutputStream();
List<PartETag> parts = new ArrayList<>();
List<CompletedPart> completedParts = new ArrayList<>();

while ((bytesRead = pipedInputStream.read(data, 0, data.length)) != -1) {
bufferOutputStream.write(data, 0, bytesRead);
Expand All @@ -145,41 +163,47 @@ public String transferToLampi(String filename, String uploadFilename, int number
bytesAdded += bytesRead;
continue;
}
PartETag partETag =
CompletedPart completedPart =
submitTaskForUploading(
uploadFilename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), false);
parts.add(partETag);
completedParts.add(completedPart);
bufferOutputStream.reset(); // flush the bufferOutputStream
bytesAdded = 0; // reset the bytes added to 0
}

PartETag partETag =
CompletedPart completedPart =
submitTaskForUploading(
uploadFilename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), true);
parts.add(partETag);
completedParts.add(completedPart);

CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(config.lampiS3Bucket(), uploadFilename, uploadId, parts);
CompleteMultipartUploadResult completeMultipartUploadResult =
CompleteMultipartUploadRequest.builder()
.bucket(config.lampiS3Bucket())
.key(uploadFilename)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
.build();
CompleteMultipartUploadResponse completeMultipartUploadResult =
lampiS3Client.completeMultipartUpload(completeRequest);

LOG.info("Tiedoston {} lähettäminen Lammen S3-ämpäriin valmistui", filename);

return completeMultipartUploadResult.getVersionId();
return completeMultipartUploadResult.versionId();
}

public void uploadManifest(List<ManifestItem> manifestItems) {
public void uploadManifest(List<ManifestItem> manifestItems) throws Exception {
String uploadFilename = config.lampiKeyPrefix() + MANIFEST_FILENAME;

String json = gson.toJson(manifestItems);
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));

LOG.info("Manifest: {}", json);

ObjectMetadata metadata = new ObjectMetadata();
PutObjectRequest putObjectRequest =
new PutObjectRequest(config.lampiS3Bucket(), uploadFilename, inputStream, metadata);
PutObjectRequest.builder().bucket(config.lampiS3Bucket()).key(uploadFilename).build();

RequestBody requestBody = RequestBody.fromInputStream(inputStream, inputStream.available());

lampiS3Client.putObject(putObjectRequest);
lampiS3Client.putObject(putObjectRequest, requestBody);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSX, Europe/Helsinki} [%t] %-5level %c: %m%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>

<logger name="software.amazon.awssdk" level="warn" />
</configuration>

0 comments on commit a8b1734

Please sign in to comment.