Skip to content

Commit

Permalink
[SNOW-776734] Fix SnowflakeFileTransferAgent.uploadStream() overwrite… (
Browse files Browse the repository at this point in the history
#1340)

[SNOW-776734] Fix SnowflakeFileTransferAgent.uploadStream() overwrite behaviour
  • Loading branch information
sfc-gh-skumbham authored Apr 6, 2023
1 parent ee0ee61 commit 77f3967
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,16 @@ public boolean execute() throws SQLException {
/** Helper to upload data from a stream */
private void uploadStream() throws SnowflakeSQLException {
try {
FileMetadata fileMetadata = fileMetadataMap.get(SRC_FILE_NAME_FOR_STREAM);

if (fileMetadata.resultStatus == ResultStatus.SKIPPED) {
logger.debug(
"Skipping {}, status: {}, details: {}",
SRC_FILE_NAME_FOR_STREAM,
fileMetadata.resultStatus,
fileMetadata.errorDetails);
return;
}
threadExecutor = SnowflakeUtil.createDefaultExecutorService("sf-stream-upload-worker-", 1);

RemoteStoreFileEncryptionMaterial encMat = encryptionMaterial.get(0);
Expand All @@ -1401,7 +1411,7 @@ private void uploadStream() throws SnowflakeSQLException {
getUploadFileCallable(
stageInfo,
SRC_FILE_NAME_FOR_STREAM,
fileMetadataMap.get(SRC_FILE_NAME_FOR_STREAM),
fileMetadata,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(stageInfo, parallel, encMat, session),
Expand Down
88 changes: 88 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/FileUploaderLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.sql.*;
import java.util.List;
Expand Down Expand Up @@ -651,6 +653,70 @@ public void testUploadFileCallableFileNotFound() throws Exception {
SnowflakeFileTransferAgent.setInjectedFileTransferException(null);
}

@Test
public void testUploadFileStreamWithNoOverwrite() throws Exception {
Connection connection = null;

try {
connection = getConnection();
Statement statement = connection.createStatement();
statement.execute("CREATE OR REPLACE STAGE testStage");

uploadFileToStageUsingStream(connection, false);
ResultSet resultSet = statement.executeQuery("LIST @testStage");
resultSet.next();
String expectedValue = resultSet.getString("last_modified");

Thread.sleep(1000); // add 1 sec delay between uploads.

uploadFileToStageUsingStream(connection, false);
resultSet = statement.executeQuery("LIST @testStage");
resultSet.next();
String actualValue = resultSet.getString("last_modified");

assertTrue(expectedValue.equals(actualValue));
} catch (Exception e) {
Assert.fail("testUploadFileStreamWithNoOverwrite failed " + e.getMessage());
} finally {
if (connection != null) {
connection.createStatement().execute("DROP STAGE if exists testStage");
connection.close();
}
}
}

@Test
public void testUploadFileStreamWithOverwrite() throws Exception {
Connection connection = null;

try {
connection = getConnection();
Statement statement = connection.createStatement();
statement.execute("CREATE OR REPLACE STAGE testStage");

uploadFileToStageUsingStream(connection, true);
ResultSet resultSet = statement.executeQuery("LIST @testStage");
resultSet.next();
String expectedValue = resultSet.getString("last_modified");

Thread.sleep(1000); // add 1 sec delay between uploads.

uploadFileToStageUsingStream(connection, true);
resultSet = statement.executeQuery("LIST @testStage");
resultSet.next();
String actualValue = resultSet.getString("last_modified");

assertFalse(expectedValue.equals(actualValue));
} catch (Exception e) {
Assert.fail("testUploadFileStreamWithNoOverwrite failed " + e.getMessage());
} finally {
if (connection != null) {
connection.createStatement().execute("DROP STAGE if exists testStage");
connection.close();
}
}
}

@Test
@ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class)
public void testGetS3StorageObjectMetadata() throws Throwable {
Expand Down Expand Up @@ -722,4 +788,26 @@ public void testGetS3StorageObjectMetadata() throws Throwable {
}
}
}

private void uploadFileToStageUsingStream(Connection connection, boolean overwrite)
throws Exception {
SFSession sfSession = connection.unwrap(SnowflakeConnectionV1.class).getSfSession();
String sourceFilePath = getFullPathFileInResource(TEST_DATA_FILE);

String putCommand = "PUT file://" + sourceFilePath + " @testStage";

if (overwrite) {
putCommand += " overwrite=true";
}

SnowflakeFileTransferAgent sfAgent =
new SnowflakeFileTransferAgent(putCommand, sfSession, new SFStatement(sfSession));

InputStream is = Files.newInputStream(Paths.get(sourceFilePath));

sfAgent.setSourceStream(is);
sfAgent.setDestFileNameForStreamSource("test_file");

sfAgent.execute();
}
}

0 comments on commit 77f3967

Please sign in to comment.