diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java index cf00c3210..4f2b371a4 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java @@ -18,6 +18,7 @@ import java.io.*; import java.net.SocketTimeoutException; import java.net.URISyntaxException; +import java.nio.channels.Channels; import java.security.InvalidKeyException; import java.util.*; import java.util.Map.Entry; @@ -448,9 +449,7 @@ public InputStream downloadToStream( 404, // because blob not found "Blob" + blobId.getName() + " not found in bucket " + blobId.getBucket())); } - - inputStream = new ByteArrayInputStream(blob.getContent()); - + inputStream = Channels.newInputStream(blob.reader()); if (isEncrypting()) { // Get the user-defined BLOB metadata Map userDefinedMetadata = blob.getMetadata(); @@ -776,11 +775,17 @@ private void uploadWithPresignedUrl( SqlState.INTERNAL_ERROR, "Unexpected: upload presigned URL invalid"); } catch (Exception e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + String stackTrace = sw.toString(); throw new SnowflakeSQLLoggedException( session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, - "Unexpected: upload with presigned url failed"); + "Unexpected: upload with presigned url failed. Ex: " + + e.getMessage() + + ", stacktrace: " + + stackTrace); } } diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java index acf269800..dafcf9404 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java @@ -1443,4 +1443,53 @@ public void testNoSpaceLeftOnDeviceException() throws SQLException { } } } + + @Test + @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) + public void testUploadWithGCSPresignedUrlWithoutConnection() throws Throwable { + Connection connection = null; + File destFolder = tmpFolder.newFolder(); + String destFolderCanonicalPath = destFolder.getCanonicalPath(); + try { + connection = getConnection("gcpaccount"); + Statement statement = connection.createStatement(); + + // create a stage to put the file in + statement.execute("CREATE OR REPLACE STAGE " + testStageName); + + SFSession sfSession = connection.unwrap(SnowflakeConnectionV1.class).getSfSession(); + + // Test put file with internal compression + String putCommand = "put file:///dummy/path/file1.gz @" + testStageName; + SnowflakeFileTransferAgent sfAgent = + new SnowflakeFileTransferAgent(putCommand, sfSession, new SFStatement(sfSession)); + List metadata = sfAgent.getFileTransferMetadatas(); + + String srcPath = getFullPathFileInResource(TEST_DATA_FILE); + for (SnowflakeFileTransferMetadata oneMetadata : metadata) { + InputStream inputStream = new FileInputStream(srcPath); + + assert (oneMetadata.isForOneFile()); + SnowflakeFileTransferAgent.uploadWithoutConnection( + SnowflakeFileTransferConfig.Builder.newInstance() + .setSnowflakeFileTransferMetadata(oneMetadata) + .setUploadStream(inputStream) + .setRequireCompress(true) + .setNetworkTimeoutInMilli(0) + .setOcspMode(OCSPMode.FAIL_OPEN) + .build()); + } + + assertTrue( + "Failed to get files", + statement.execute( + "GET @" + testStageName + " 'file://" + destFolderCanonicalPath + "/' parallel=8")); + assert (isFileContentEqual(srcPath, false, destFolderCanonicalPath + "/file1.gz", true)); + } finally { + if (connection != null) { + connection.createStatement().execute("DROP STAGE if exists " + testStageName); + connection.close(); + } + } + } } diff --git a/src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java b/src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java index 139d47f9d..28cb6b8a8 100644 --- a/src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java @@ -5,6 +5,9 @@ import static org.junit.Assert.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.ResultSet; @@ -14,6 +17,8 @@ import net.snowflake.client.ConditionalIgnoreRule; import net.snowflake.client.RunningOnGithubAction; import net.snowflake.client.category.TestCategoryOthers; +import org.apache.commons.io.IOUtils; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -115,4 +120,80 @@ public void testDownloadToStreamBlobNotFoundGCS() throws SQLException { closeSQLObjects(statement, connection); } } + + @Test + @Ignore + public void testDownloadToStreamGCSPresignedUrl() throws SQLException, IOException { + final String DEST_PREFIX = "testUploadStream"; + Connection connection = null; + Statement statement = null; + connection = getConnection("gcpaccount"); + statement = connection.createStatement(); + statement.execute("create or replace stage testgcpstage"); + ResultSet rset = + statement.executeQuery( + "PUT file://" + + getFullPathFileInResource(TEST_DATA_FILE) + + " @testgcpstage/" + + DEST_PREFIX); + assertTrue(rset.next()); + assertEquals("Error message:" + rset.getString(8), "UPLOADED", rset.getString(7)); + + InputStream out = + connection + .unwrap(SnowflakeConnection.class) + .downloadStream("@testgcpstage", DEST_PREFIX + "/" + TEST_DATA_FILE + ".gz", true); + StringWriter writer = new StringWriter(); + IOUtils.copy(out, writer, "UTF-8"); + String output = writer.toString(); + // the first 2 characters + assertEquals("1|", output.substring(0, 2)); + + // the number of lines + String[] lines = output.split("\n"); + assertEquals(28, lines.length); + + statement.execute("rm @~/" + DEST_PREFIX); + statement.close(); + closeSQLObjects(statement, connection); + } + + @Test + @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) + public void testDownloadToStreamGCS() throws SQLException, IOException { + final String DEST_PREFIX = TEST_UUID + "/testUploadStream"; + Connection connection = null; + Statement statement = null; + Properties paramProperties = new Properties(); + paramProperties.put("GCS_USE_DOWNSCOPED_CREDENTIAL", true); + try { + connection = getConnection("gcpaccount", paramProperties); + statement = connection.createStatement(); + ResultSet rset = + statement.executeQuery( + "PUT file://" + getFullPathFileInResource(TEST_DATA_FILE) + " @~/" + DEST_PREFIX); + assertTrue(rset.next()); + assertEquals("UPLOADED", rset.getString(7)); + + InputStream out = + connection + .unwrap(SnowflakeConnection.class) + .downloadStream("~", DEST_PREFIX + "/" + TEST_DATA_FILE + ".gz", true); + StringWriter writer = new StringWriter(); + IOUtils.copy(out, writer, "UTF-8"); + String output = writer.toString(); + // the first 2 characters + assertEquals("1|", output.substring(0, 2)); + + // the number of lines + String[] lines = output.split("\n"); + assertEquals(28, lines.length); + } finally { + if (statement != null) { + statement.execute("rm @~/" + DEST_PREFIX); + statement.close(); + } + closeSQLObjects(statement, connection); + } + } }