Skip to content

Commit

Permalink
SNOW-1890076: Ensure correct file is downloaded as stream (#2043)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dprzybysz authored Jan 27, 2025
1 parent 979a161 commit 2355c5a
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/main/java/net/snowflake/client/jdbc/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public enum ErrorCode {
GCP_SERVICE_ERROR(200061, SqlState.SYSTEM_ERROR),
AUTHENTICATOR_REQUEST_TIMEOUT(200062, SqlState.CONNECTION_EXCEPTION),
INVALID_STRUCT_DATA(200063, SqlState.DATA_EXCEPTION),
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE);
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE),
TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM(200065, SqlState.DATA_EXCEPTION);

public static final String errorMessageResource = "net.snowflake.client.jdbc.jdbc_error_messages";

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/net/snowflake/client/jdbc/SnowflakeConnectionV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
Expand All @@ -61,12 +62,14 @@ public class SnowflakeConnectionV1 implements Connection, SnowflakeConnection {

/** Refer to all created and open statements from this connection */
private final Set<Statement> openStatements = ConcurrentHashMap.newKeySet();

// Injected delay for the purpose of connection timeout testing
// Any statement execution will sleep for the specified number of milliseconds
private final AtomicInteger _injectedDelay = new AtomicInteger(0);
private boolean isClosed;
private SQLWarning sqlWarnings = null;
private List<DriverPropertyInfo> missingProperties = null;

/**
* Amount of milliseconds a user is willing to tolerate for network related issues (e.g. HTTP
* 503/504) or database transient issues (e.g. GS not responding)
Expand All @@ -76,12 +79,14 @@ public class SnowflakeConnectionV1 implements Connection, SnowflakeConnection {
* <p>Default: 300 seconds
*/
private int networkTimeoutInMilli = 0; // in milliseconds

/* this should be set to Connection.TRANSACTION_READ_COMMITTED
* There may not be many implications here since the call to
* setTransactionIsolation doesn't do anything.
*/
private int transactionIsolation = Connection.TRANSACTION_NONE;
private SFBaseSession sfSession;

/** The SnowflakeConnectionImpl that provides the underlying physical-layer implementation */
private SFConnectionHandler sfConnectionHandler;

Expand Down Expand Up @@ -1037,6 +1042,12 @@ public InputStream downloadStream(String stageName, String sourceFileName, boole
// no file will be downloaded to this location
getCommand.append(" file:///tmp/ /*jdbc download stream*/");

// We cannot match whole sourceFileName since it may be different e.g. for git repositories so
// we match only raw filename
String[] split = sourceFileName.split("/");
String fileName = Pattern.quote(split[split.length - 1]);
getCommand.append(" PATTERN=\".*").append(fileName).append("$\"");

SFBaseFileTransferAgent transferAgent =
sfConnectionHandler.getFileTransferAgent(getCommand.toString(), stmt.getSFBaseStatement());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,18 @@ public InputStream downloadStream(String fileName) throws SnowflakeSQLException
remoteLocation remoteLocation = extractLocationAndPath(stageInfo.getLocation());

// when downloading files as stream there should be only one file in source files
// let's fail fast when more than one file matches instead of fetching random one
if (sourceFiles.size() > 1) {
throw new SnowflakeSQLException(
queryID,
SqlState.NO_DATA,
ErrorCode.TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM.getMessageCode(),
session,
"There are more than one file matching "
+ fileName
+ ": "
+ String.join(",", sourceFiles));
}
String sourceLocation =
sourceFiles.stream()
.findFirst()
Expand Down
73 changes: 73 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import net.snowflake.client.annotations.DontRunOnGithubActions;
import net.snowflake.client.category.TestTags;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -237,4 +241,73 @@ public void testSpecialCharactersInFileName() throws SQLException, IOException {
}
}
}

/** Added > 3.21.0. Fixed regression introduced in 3.19.1 */
@Test
public void shouldDownloadStreamInDeterministicWay() throws Exception {
try (Connection conn = getConnection();
Statement stat = conn.createStatement()) {
String randomStage = "test" + UUID.randomUUID().toString().replaceAll("-", "");
try {
stat.execute("CREATE OR REPLACE STAGE " + randomStage);
String randomDir = UUID.randomUUID().toString();
String sourceFilePathWithoutExtension = getFullPathFileInResource("test_file");
String sourceFilePathWithExtension = getFullPathFileInResource("test_file.csv");
String stageDest = String.format("@%s/%s", randomStage, randomDir);
putFile(stat, sourceFilePathWithExtension, stageDest, false);
putFile(stat, sourceFilePathWithoutExtension, stageDest, false);
putFile(stat, sourceFilePathWithExtension, stageDest, true);
putFile(stat, sourceFilePathWithoutExtension, stageDest, true);
expectsFilesOnStage(stat, stageDest, 4);
String stageName = "@" + randomStage;
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file.gz", true, "I am a file without extension");
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file.csv.gz", true, "I am a file with extension");
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file", false, "I am a file without extension");
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file.csv", false, "I am a file with extension");
} finally {
stat.execute("DROP STAGE IF EXISTS " + randomStage);
}
}
}

private static void downloadStreamExpectingContent(
Connection conn,
String stageName,
String fileName,
boolean decompress,
String expectedFileContent)
throws IOException, SQLException {
try (InputStream inputStream =
conn.unwrap(SnowflakeConnectionV1.class)
.downloadStream(stageName, fileName, decompress);
InputStreamReader isr = new InputStreamReader(inputStream);
BufferedReader br = new BufferedReader(isr)) {
String content = br.lines().collect(Collectors.joining("\n"));
assertEquals(expectedFileContent, content);
}
}

private static void expectsFilesOnStage(Statement stmt, String stageDest, int expectCount)
throws SQLException {
int filesInStageDir = 0;
try (ResultSet rs = stmt.executeQuery("LIST " + stageDest)) {
while (rs.next()) {
++filesInStageDir;
}
}
assertEquals(expectCount, filesInStageDir);
}

private static boolean putFile(
Statement stmt, String localFileName, String stageDest, boolean autoCompress)
throws SQLException {
return stmt.execute(
String.format(
"PUT file://%s %s AUTO_COMPRESS=%s",
localFileName, stageDest, String.valueOf(autoCompress).toUpperCase()));
}
}
1 change: 1 addition & 0 deletions src/test/resources/test_file
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I am a file without extension
1 change: 1 addition & 0 deletions src/test/resources/test_file.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I am a file with extension

0 comments on commit 2355c5a

Please sign in to comment.